2020 03 08 filter sync (#1209)

* Create FilterSync, which gives us an API inside of the chain project to sync filters with

Add another unit test to filter sync

Add more unit tests for ChainSync and FilterSync

Clean up some docs, remove some extra lines of code

Run scalafmt

Add filter-sync.md

Cleanup some nits

Add more information of how FilterSync.syncFilters() works

Add 'FilterWithHeaderHash' type so that we can actually validate/verify block headers that are being fed into the chain project

Run scalafmt, hide imports in filter-sync.md so code appears cleaner

Move implicits out of invisible block as it seems to cause errors

Make it so FilterSync processes filters in batches rather than fetching them all at once

Fix compile error

* Add comment about trust model

* Run scalafmt
This commit is contained in:
Chris Stewart 2020-03-10 18:01:14 -05:00 committed by GitHub
parent 6ed12884ac
commit 4c9a22f1e1
23 changed files with 832 additions and 86 deletions

View file

@ -3,6 +3,10 @@ package org.bitcoins.rpc.client.common
import java.io.File import java.io.File
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.rpc.client.v16.BitcoindV16RpcClient
import org.bitcoins.rpc.client.v17.BitcoindV17RpcClient
import org.bitcoins.rpc.client.v18.BitcoindV18RpcClient
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.rpc.config.{BitcoindConfig, BitcoindInstance} import org.bitcoins.rpc.config.{BitcoindConfig, BitcoindInstance}
/** /**
@ -81,6 +85,22 @@ object BitcoindRpcClient {
val cli = BitcoindRpcClient(instance) val cli = BitcoindRpcClient(instance)
cli cli
} }
/** Returns a bitcoind with the appropriated version you passed in, the bitcoind is NOT started. */
def fromVersion(version: BitcoindVersion, instance: BitcoindInstance)(
implicit system: ActorSystem): BitcoindRpcClient = {
val bitcoind = version match {
case BitcoindVersion.V16 => BitcoindV16RpcClient.withActorSystem(instance)
case BitcoindVersion.V17 => BitcoindV17RpcClient.withActorSystem(instance)
case BitcoindVersion.V18 => BitcoindV18RpcClient.withActorSystem(instance)
case BitcoindVersion.V19 => BitcoindV19RpcClient.withActorSystem(instance)
case BitcoindVersion.Experimental | BitcoindVersion.Unknown =>
sys.error(
s"Cannot create a bitcoind from a unknown or experimental version")
}
bitcoind
}
} }
sealed trait BitcoindVersion sealed trait BitcoindVersion

View file

@ -2,8 +2,9 @@ package org.bitcoins.chain.blockchain.sync
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.testkit.chain.ChainUnitTest import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil}
import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaRpc import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaRpc
import org.scalatest.FutureOutcome import org.scalatest.FutureOutcome
@ -26,13 +27,9 @@ class ChainSyncTest extends ChainUnitTest {
val bitcoind = bitcoindWithChainHandler.bitcoindRpc val bitcoind = bitcoindWithChainHandler.bitcoindRpc
val chainHandler = bitcoindWithChainHandler.chainHandler val chainHandler = bitcoindWithChainHandler.chainHandler
//first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions
val getBestBlockHashFunc = { () => val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind)
bitcoind.getBestBlockHash
}
val getBlockHeaderFunc = { hash: DoubleSha256DigestBE => val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind)
bitcoind.getBlockHeader(hash).map(_.blockHeader)
}
//let's generate a block on bitcoind //let's generate a block on bitcoind
val block1F = val block1F =
@ -72,4 +69,53 @@ class ChainSyncTest extends ChainUnitTest {
chainHandler.getBlockCount.map(count => assert(count == 0)) chainHandler.getBlockCount.map(count => assert(count == 0))
} }
} }
it must "be able to call sync() twice and not fail when nothing has happened" in {
bitcoindWithChainHandler =>
val bitcoind = bitcoindWithChainHandler.bitcoindRpc
val chainHandler = bitcoindWithChainHandler.chainHandler
//first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions
val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind)
val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind)
val generate1F = for {
addr <- bitcoind.getNewAddress
hashes <- bitcoind.generateToAddress(1, addr)
} yield hashes
val sync1F: Future[ChainApi] = generate1F.flatMap { _ =>
ChainSync.sync(chainHandler = chainHandler,
getBlockHeaderFunc = getBlockHeaderFunc,
getBestBlockHashFunc = getBestBlockHashFunc)
}
val assertion1F = for {
hashes <- generate1F
chainApiSync1 <- sync1F
count <- chainApiSync1.getBlockCount()
bestHash <- chainApiSync1.getBestBlockHash()
} yield {
assert(count == 1)
assert(bestHash == hashes.head)
}
//let's call sync again and make sure nothing bad happens
val sync2F = for {
_ <- assertion1F
chainApiSync1 <- sync1F
chainApiSync2 <- ChainSync.sync(
chainHandler = chainApiSync1.asInstanceOf[ChainHandler],
getBlockHeaderFunc = getBlockHeaderFunc,
getBestBlockHashFunc = getBestBlockHashFunc)
count <- chainApiSync2.getBlockCount()
hashes <- generate1F
bestHash <- chainApiSync2.getBestBlockHash()
} yield {
assert(count == 1)
assert(bestHash == hashes.head)
}
sync2F
}
} }

View file

@ -0,0 +1,127 @@
package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.core.gcs.{FilterType, GolombFilter}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler
import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil}
import org.scalatest.FutureOutcome
import scala.concurrent.Future
class FilterSyncTest extends ChainUnitTest {
override type FixtureParam = BitcoindV19ChainHandler
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
withBitcoindV19ChainHandlerViaRpc(test)
}
behavior of "FilterSync"
it must "sync 1 filter header from an external data source" in { fixture =>
val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture
val initFilterCountF = chainHandler.getFilterCount
val initFilterHeaderCountF = chainHandler.getFilterHeaderCount
val initAssertionsF = for {
initFilterCount <- initFilterCountF
initFilterHeaderCount <- initFilterHeaderCountF
} yield {
assert(initFilterCount == 0)
assert(initFilterHeaderCount == 0)
}
val generated1BlockF = for {
_ <- initAssertionsF
addr <- bitcoind.getNewAddress
hashes <- bitcoind.generateToAddress(1, addr)
} yield hashes
val syncedF = generated1BlockF.flatMap { _ =>
syncHelper(fixture)
}
for {
syncedChainApi <- syncedF
filterHeaderCount <- syncedChainApi.getFilterHeaderCount()
_ = assert(filterHeaderCount == 1)
filterCount <- syncedChainApi.getFilterCount()
} yield assert(filterCount == 1)
}
it must "sync a bunch of filter headers from an external data source" in {
fixture =>
val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture
val numBlocks = 100
val generatedBlocksF = for {
addr <- bitcoind.getNewAddress
hashes <- bitcoind.generateToAddress(numBlocks, addr)
} yield hashes
val syncedF = generatedBlocksF.flatMap { _ =>
syncHelper(fixture)
}
for {
syncedChainApi <- syncedF
filterHeaderCount <- syncedChainApi.getFilterHeaderCount()
_ = assert(filterHeaderCount == numBlocks)
filterCount <- syncedChainApi.getFilterCount()
} yield assert(filterCount == numBlocks)
}
it must "be able to call filterSync() and not fail when nothing has happened" in {
fixture =>
val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture
val generated1BlockF = for {
addr <- bitcoind.getNewAddress
hashes <- bitcoind.generateToAddress(1, addr)
} yield hashes
val synced1F = generated1BlockF.flatMap { _ =>
syncHelper(fixture)
}
val sync2F = synced1F.flatMap { chainApi =>
syncHelper(
fixture.copy(chainHandler = chainApi.asInstanceOf[ChainHandler]))
}
for {
syncedChainApi <- sync2F
filterHeaderCount <- syncedChainApi.getFilterHeaderCount()
_ = assert(filterHeaderCount == 1)
filterCount <- syncedChainApi.getFilterCount()
} yield assert(filterCount == 1)
}
private def syncHelper(
bitcoindV19ChainHandler: BitcoindV19ChainHandler): Future[ChainApi] = {
val filterType = FilterType.Basic
val BitcoindV19ChainHandler(bitcoind, chainHandler) =
bitcoindV19ChainHandler
val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind)
val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind)
val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] =
SyncUtil.getFilterFunc(bitcoind, filterType)
//first sync the chain
val syncedHeadersF = ChainSync.sync(chainHandler = chainHandler,
getBlockHeaderFunc = getBlockHeaderFunc,
getBestBlockHashFunc =
getBestBlockHashFunc)
//now sync filters
syncedHeadersF.flatMap { syncedChainHandler =>
FilterSync.syncFilters(
chainApi = syncedChainHandler,
getFilterFunc = getFilterFunc
)
}
}
}

View file

@ -120,6 +120,12 @@ trait ChainApi extends ChainQueryApi {
def getFilterHeadersAtHeight( def getFilterHeadersAtHeight(
height: Int): Future[Vector[CompactFilterHeaderDb]] height: Int): Future[Vector[CompactFilterHeaderDb]]
/** Finds the "best" filter header we have stored in our database
* What this means in practice is the latest filter header we
* have received from our peer.
* */
def getBestFilterHeader(): Future[CompactFilterHeaderDb]
/** /**
* Looks up a compact filter header by its hash. * Looks up a compact filter header by its hash.
*/ */
@ -141,4 +147,9 @@ trait ChainApi extends ChainQueryApi {
/** Returns the block height of the given block stamp */ /** Returns the block height of the given block stamp */
def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int]
/** Fetchs the block headers between from (exclusive) and to (inclusive) */
def getHeadersBetween(
from: BlockHeaderDb,
to: BlockHeaderDb): Future[Vector[BlockHeaderDb]]
} }

View file

@ -193,9 +193,9 @@ case class ChainHandler(
filterHeaders: Vector[FilterHeader], filterHeaders: Vector[FilterHeader],
stopHash: DoubleSha256DigestBE): Future[ChainApi] = { stopHash: DoubleSha256DigestBE): Future[ChainApi] = {
val filterHeadersToCreateF = for { val filterHeadersToCreateF: Future[Vector[CompactFilterHeaderDb]] = for {
blockHeaders <- blockHeaderDAO blockHeaders <- blockHeaderDAO
.getNChildren(stopHash, filterHeaders.size - 1) .getNChildren(ancestorHash = stopHash, n = filterHeaders.size - 1)
.map(_.sortBy(_.height)) .map(_.sortBy(_.height))
} yield { } yield {
if (blockHeaders.size != filterHeaders.size) { if (blockHeaders.size != filterHeaders.size) {
@ -214,17 +214,24 @@ case class ChainHandler(
for { for {
filterHeadersToCreate <- filterHeadersToCreateF filterHeadersToCreate <- filterHeadersToCreateF
_ <- if (filterHeadersToCreate.nonEmpty && filterHeadersToCreate.head.height > 0) { _ <- if (filterHeadersToCreate.nonEmpty && filterHeadersToCreate.head.height > 0) {
filterHeaderDAO val firstFilter = filterHeadersToCreate.head
.findByHash(filterHeadersToCreate.head.previousFilterHeaderBE) val filterHashFOpt = filterHeaderDAO
.map { prevHeaderOpt => .findByHash(firstFilter.previousFilterHeaderBE)
filterHashFOpt.map {
case Some(prevHeader) =>
require( require(
prevHeaderOpt.nonEmpty, prevHeader.height == firstFilter.height - 1,
s"Previous filter header does not exist: ${filterHeadersToCreate.head.previousFilterHeaderBE}") s"Unexpected previous header's height: ${prevHeader.height} != ${filterHeadersToCreate.head.height - 1}"
require(
prevHeaderOpt.get.height == filterHeadersToCreate.head.height - 1,
s"Unexpected previous header's height: ${prevHeaderOpt.get.height} != ${filterHeadersToCreate.head.height - 1}"
) )
} case None =>
if (firstFilter.previousFilterHeaderBE == DoubleSha256DigestBE.empty && firstFilter.height == 0) {
//we are ok, according to BIP157 the previous the genesis filter's prev hash should
//be the empty hash
()
} else {
sys.error(s"Previous filter header does not exist: $firstFilter")
}
}
} else FutureUtil.unit } else FutureUtil.unit
_ <- filterHeaderDAO.createAll(filterHeadersToCreate) _ <- filterHeaderDAO.createAll(filterHeadersToCreate)
} yield this } yield this
@ -347,6 +354,32 @@ case class ChainHandler(
height: Int): Future[Vector[CompactFilterHeaderDb]] = height: Int): Future[Vector[CompactFilterHeaderDb]] =
filterHeaderDAO.getAtHeight(height) filterHeaderDAO.getAtHeight(height)
/** @inheritdoc */
override def getBestFilterHeader(): Future[CompactFilterHeaderDb] = {
//this seems realy brittle, is there a guarantee
//that the highest filter header count is our best filter header?
val filterCountF = getFilterHeaderCount()
val ourBestFilterHeader = for {
count <- filterCountF
filterHeader <- getFilterHeadersAtHeight(count)
} yield {
//TODO: Figure out what the best way to select
//the best filter header is if we have competing
//chains. (Same thing applies to getBestBlockHash()
//for now, just do the dumb thing and pick the first one
filterHeader match {
case tip1 +: _ +: _ =>
tip1
case tip +: _ =>
tip
case Vector() =>
sys.error(s"No filter headers found in database!")
}
}
ourBestFilterHeader
}
/** @inheritdoc */ /** @inheritdoc */
override def getFilterHeader( override def getFilterHeader(
blockHash: DoubleSha256DigestBE): Future[Option[CompactFilterHeaderDb]] = blockHash: DoubleSha256DigestBE): Future[Option[CompactFilterHeaderDb]] =
@ -410,6 +443,28 @@ case class ChainHandler(
.map(dbos => .map(dbos =>
dbos.map(dbo => dbos.map(dbo =>
FilterResponse(dbo.golombFilter, dbo.blockHashBE, dbo.height))) FilterResponse(dbo.golombFilter, dbo.blockHashBE, dbo.height)))
/** @inheritdoc */
override def getHeadersBetween(
from: BlockHeaderDb,
to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = {
logger.info(s"Finding headers from=$from to=$to")
def loop(
currentF: Future[BlockHeaderDb],
accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = {
currentF.flatMap { current =>
if (current.previousBlockHashBE == from.hashBE) {
Future.successful(current +: accum)
} else {
val nextOptF = getHeader(current.previousBlockHashBE)
val nextF = nextOptF.map(_.getOrElse(
sys.error(s"Could not find header=${current.previousBlockHashBE}")))
loop(nextF, current +: accum)
}
}
}
loop(Future.successful(to), Vector.empty)
}
} }
object ChainHandler { object ChainHandler {

View file

@ -1,16 +1,16 @@
package org.bitcoins.chain.blockchain.sync package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDb import org.bitcoins.chain.models.BlockHeaderDb
import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.core.protocol.blockchain.BlockHeader
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.ChainVerificationLogger
trait ChainSync extends ChainVerificationLogger { abstract class ChainSync extends ChainVerificationLogger {
/** This method checks if our chain handler has the tip of the blockchain as an external source /** This method checks if our chain handler has the tip of the blockchain as an external source
* If we do not have the same chain, we sync our chain handler until we are at the same best block hash * If we do not have the same chain, we sync our chain handler until we are at the same best block hash
@ -74,12 +74,11 @@ trait ChainSync extends ChainVerificationLogger {
require(tips.nonEmpty, s"Cannot sync without the genesis block") require(tips.nonEmpty, s"Cannot sync without the genesis block")
//we need to walk backwards on the chain until we get to one of our tips //we need to walk backwards on the chain until we get to one of our tips
val tipsBH = tips.map(_.blockHeader) val tipsBH = tips.map(_.blockHeader)
def loop( def loop(
lastHeaderF: Future[BlockHeader], lastHeaderF: Future[BlockHeader],
accum: List[BlockHeader]): Future[List[BlockHeader]] = { accum: Vector[BlockHeader]): Future[Vector[BlockHeader]] = {
lastHeaderF.flatMap { lastHeader => lastHeaderF.flatMap { lastHeader =>
if (tipsBH.contains(lastHeader)) { if (tipsBH.contains(lastHeader)) {
//means we have synced back to a block that we know //means we have synced back to a block that we know
@ -117,18 +116,17 @@ trait ChainSync extends ChainVerificationLogger {
} else { } else {
//this represents all headers we have received from our external data source //this represents all headers we have received from our external data source
//and need to process with our chain handler //and need to process with our chain handler
val headersToSyncF = loop(bestHeaderF, List.empty) val headersToSyncF = loop(bestHeaderF, Vector.empty)
//now we are going to add them to our chain and return the chain api //now we are going to add them to our chain and return the chain api
headersToSyncF.flatMap { headers => headersToSyncF.flatMap { headers =>
logger.info( logger.info(
s"Attempting to sync ${headers.length} blockheader to our chainstate") s"Attempting to sync ${headers.length} blockheader to our chainstate")
chainApi.processHeaders(headers.toVector) chainApi.processHeaders(headers)
} }
} }
} }
} }
} }

View file

@ -0,0 +1,224 @@
package org.bitcoins.chain.blockchain.sync
import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.{BlockHeaderDb, CompactFilterHeaderDb}
import org.bitcoins.core.gcs.{FilterHeader, GolombFilter}
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.protocol.blockchain.BlockHeader
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
/** A class that is meant to expose and api to sync
* [[GolombFilter]]s and [[FilterHeader]]s from an external
* data source. The important thing to implement is
* {{{
* getFilterFunc: BlockHeader => Future[GolombFilter]
* }}}
* which will allow us to sync our internal filters against.
*
* It should be noted you are entirely trusting the provider
* of the `getFilterFunc` as you aren't able to validate the result
* against another peer that as BIP157 specifies
*
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki#client-operation]]
* */
abstract class FilterSync extends ChainVerificationLogger {
def syncFilters(
chainApi: ChainApi,
getFilterFunc: BlockHeader => Future[FilterWithHeaderHash],
batchSize: Int = 25)(
implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[ChainApi] = {
val ourBestFilterHeaderF = chainApi.getBestFilterHeader()
val ourBestBlockHeaderF = chainApi.getBestBlockHeader()
for {
ours <- ourBestFilterHeaderF
ourBestBlockHeader <- ourBestBlockHeaderF
syncedChainApi <- syncFiltersToTip(chainApi = chainApi,
ourBestHeader = ourBestBlockHeader,
ourBestFilterHeader = ours,
getFilterFunc = getFilterFunc,
batchSize)
} yield {
syncedChainApi
}
}
private case class BlockFilterAggregated(
filterHeader: FilterHeader,
filter: GolombFilter,
blockHeader: BlockHeader)
/**
* Syncs our best filter header to our best block hash
* @param chainApi our current chain state
* @param ourBestHeader the block header we are going to sync filters up until
* @param ourBestFilterHeader the best filter header we have
* @param getFilterFunc given a block hash it retrieves filter associated with that hash from our external source
* @param ec
* @return
*/
private def syncFiltersToTip(
chainApi: ChainApi,
ourBestHeader: BlockHeaderDb,
ourBestFilterHeader: CompactFilterHeaderDb,
getFilterFunc: BlockHeader => Future[FilterWithHeaderHash],
batchSize: Int)(
implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[ChainApi] = {
if (ourBestFilterHeader.blockHashBE == ourBestHeader.hashBE) {
logger.info(
s"Our filters are synced with our peers filters, both at blockHash=${ourBestFilterHeader.blockHashBE}")
Future.successful(chainApi)
} else {
logger.info(
s"Beginning sync for filters from filterheader=${ourBestFilterHeader} to blockheader=${ourBestHeader.hashBE}")
//let's fetch all missing filter headers first
val bestFilterBlockHeaderF =
chainApi.getHeader(ourBestFilterHeader.blockHashBE)
val headersMissingFiltersF = for {
bestFilterBlockHeader <- bestFilterBlockHeaderF
missing <- chainApi.getHeadersBetween(from = bestFilterBlockHeader.get,
to = ourBestHeader)
} yield {
missing
}
//because filters can be really large, we don't want to process too many
//at once, so batch them in groups and the process them.
val groupedHeadersF: Future[Iterator[Vector[BlockHeaderDb]]] = for {
missing <- headersMissingFiltersF
} yield missing.grouped(batchSize)
val init = Future.successful(chainApi)
for {
groupedHeaders <- groupedHeadersF
finalChainApi <- {
groupedHeaders.foldLeft(init) {
case (apiF, missingHeaders) =>
for {
api <- apiF
bestFilter <- api.getBestFilterHeader()
newApi <- fetchFiltersForHeaderGroup(api,
missingHeaders,
bestFilter,
getFilterFunc)
} yield newApi
}
}
} yield finalChainApi
}
}
private def fetchFiltersForHeaderGroup(
chainApi: ChainApi,
missingHeaders: Vector[BlockHeaderDb],
ourBestFilterHeader: CompactFilterHeaderDb,
getFilterFunc: BlockHeader => Future[FilterWithHeaderHash])(
implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[ChainApi] = {
//now that we have headers that are missing filters, let's fetch the filters
val fetchNested = missingHeaders.map { b =>
val filterF = getFilterFunc(b.blockHeader)
filterF.map(f => (b, f))
}
val fetchFiltersF: Future[Vector[(BlockHeaderDb, FilterWithHeaderHash)]] = {
Future.sequence(fetchNested)
}
//now let's build filter headers
val blockFiltersAggF: Future[Vector[BlockFilterAggregated]] = {
fetchFiltersF.map {
case filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)] =>
buildBlockFilterAggregated(filters, ourBestFilterHeader)
}
}
val compactFiltersF = blockFiltersAggF.map { filtersAgg =>
filtersAgg.map { agg =>
CompactFilterMessage(blockHash = agg.blockHeader.hash,
filter = agg.filter)
}
}
val blockHeaderOptF = blockFiltersAggF.map { filtersAgg =>
filtersAgg.lastOption.map(_.blockHeader)
}
val filterHeadersF = blockFiltersAggF.map(_.map(_.filterHeader))
for {
blockHeaderOpt <- blockHeaderOptF
compactFilters <- compactFiltersF
filterHeaders <- filterHeadersF
filtersChainApi <- {
blockHeaderOpt match {
case None =>
logger.info(
s"We did not have a block header to process filter headers with! filterHeaders=${filterHeaders} " +
s"compactFilters=${compactFilters} ourBestFilterHeader=${ourBestFilterHeader}")
Future.successful(chainApi)
case Some(blockHeader) =>
for {
headersChainApi <- chainApi.processFilterHeaders(
filterHeaders,
blockHeader.hashBE)
filtersChainApi <- headersChainApi.processFilters(compactFilters)
} yield filtersChainApi
}
}
} yield {
filtersChainApi
}
}
/** This builds a [[BlockFilterAggregated]] data structure
* and verifies that the filter header hash from an external
* data source matches the hash of the header we generated internally.
* If the hash does not match, someone is likely feeding you a bad header chain.
* */
private def buildBlockFilterAggregated(
filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)],
ourBestFilterHeader: CompactFilterHeaderDb): Vector[
BlockFilterAggregated] = {
val accum = new mutable.ArrayBuffer[BlockFilterAggregated](filters.length)
filters.foreach {
case (blockHeaderDb, filterWithHash) =>
val FilterWithHeaderHash(filter, expectedHeaderHash) = filterWithHash
val filterHeader = if (accum.isEmpty) {
//first header to connect with our internal headers
//that have already been validated
FilterHeader(filterHash = filter.hash,
prevHeaderHash = ourBestFilterHeader.hashBE.flip)
} else {
//get previous filter header's hash
val prevHeaderHash = accum.last.filterHeader.hash
FilterHeader(filterHash = filter.hash,
prevHeaderHash = prevHeaderHash)
}
if (filterHeader.hash == expectedHeaderHash.flip) {
val agg = BlockFilterAggregated(filterHeader,
filter,
blockHeaderDb.blockHeader)
accum.append(agg)
} else {
sys.error(
s"The header we created was different from the expected hash we received " +
s"from an external data source! Something is wrong. Our filterHeader=${filterHeader} expectedHash=$expectedHeaderHash")
}
}
accum.toVector
}
}
object FilterSync extends FilterSync

View file

@ -0,0 +1,11 @@
package org.bitcoins.chain.blockchain.sync
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.gcs.GolombFilter
/** Represents a [[GolombFilter]] with it's [[org.bitcoins.core.gcs.FilterHeader]] associated with it
* This is needed because bitcoin core's 'getblockfilter' rpc returns things in this structure
* */
case class FilterWithHeaderHash(
filter: GolombFilter,
headerHash: DoubleSha256DigestBE)

View file

@ -141,7 +141,7 @@ case class BlockHeaderDAO()(
} }
} }
/** Gets Block Headers of all childred starting with the given block hash (inclusive), could be out of order */ /** Gets Block Headers of all children starting with the given block hash (inclusive), could be out of order */
def getNChildren( def getNChildren(
ancestorHash: DoubleSha256DigestBE, ancestorHash: DoubleSha256DigestBE,
n: Int): Future[Vector[BlockHeaderDb]] = { n: Int): Future[Vector[BlockHeaderDb]] = {

View file

@ -14,6 +14,10 @@ case class CompactFilterHeaderDb(
def filterHeader: FilterHeader = def filterHeader: FilterHeader =
FilterHeader(filterHashBE.flip, previousFilterHeaderBE.flip) FilterHeader(filterHashBE.flip, previousFilterHeaderBE.flip)
override def toString: String = {
s"CompactFilterDb(hashBE=$hashBE,filterHashBE=$filterHashBE,previousFilterHeaderBE=$previousFilterHeaderBE,blockHashBE=$blockHashBE,height=$height)"
}
} }
object CompactFilterHeaderDbHelper { object CompactFilterHeaderDbHelper {
@ -21,7 +25,7 @@ object CompactFilterHeaderDbHelper {
def fromFilterHeader( def fromFilterHeader(
filterHeader: FilterHeader, filterHeader: FilterHeader,
blockHash: DoubleSha256DigestBE, blockHash: DoubleSha256DigestBE,
height: Int) = height: Int): CompactFilterHeaderDb =
CompactFilterHeaderDb( CompactFilterHeaderDb(
hashBE = filterHeader.hash.flip, hashBE = filterHeader.hash.flip,
filterHashBE = filterHeader.filterHash.flip, filterHashBE = filterHeader.filterHash.flip,

View file

@ -13,10 +13,17 @@ case class CompactFilterDb(
bytes: ByteVector, bytes: ByteVector,
height: Int, height: Int,
blockHashBE: DoubleSha256DigestBE) { blockHashBE: DoubleSha256DigestBE) {
require(
CryptoUtil.doubleSHA256(bytes).flip == hashBE,
s"Bytes must hash to hashBE! It looks like you didn't construct CompactFilterDb correctly")
def golombFilter: GolombFilter = filterType match { def golombFilter: GolombFilter = filterType match {
case FilterType.Basic => BlockFilter.fromBytes(bytes, blockHashBE.flip) case FilterType.Basic => BlockFilter.fromBytes(bytes, blockHashBE.flip)
} }
override def toString: String = {
s"CompactFilterDb(hashBE=$hashBE,filterType=$filterType,height=$height,blockHashBE=$blockHashBE,bytes=${bytes})"
}
} }
object CompactFilterDbHelper { object CompactFilterDbHelper {

View file

@ -32,6 +32,7 @@ trait ChainQueryApi {
def getFiltersBetweenHeights( def getFiltersBetweenHeights(
startHeight: Int, startHeight: Int,
endHeight: Int): Future[Vector[FilterResponse]] endHeight: Int): Future[Vector[FilterResponse]]
} }
object ChainQueryApi { object ChainQueryApi {

View file

@ -1,6 +1,6 @@
package org.bitcoins.core.gcs package org.bitcoins.core.gcs
import org.bitcoins.core.crypto.DoubleSha256Digest import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.util.CryptoUtil import org.bitcoins.core.util.CryptoUtil
/** /**
@ -25,4 +25,17 @@ case class FilterHeader(
def nextHeader(nextFilterHash: DoubleSha256Digest): FilterHeader = { def nextHeader(nextFilterHash: DoubleSha256Digest): FilterHeader = {
FilterHeader(filterHash = nextFilterHash, prevHeaderHash = this.hash) FilterHeader(filterHash = nextFilterHash, prevHeaderHash = this.hash)
} }
override def toString: String = {
s"FilterHeader(hashBE=${hash.flip},filterHashBE=${filterHash.flip.hex},prevHeaderHashBE=${prevHeaderHash.flip.hex})"
}
}
object FilterHeader {
def apply(
filterHash: DoubleSha256DigestBE,
prevHeaderHash: DoubleSha256DigestBE): FilterHeader = {
new FilterHeader(filterHash.flip, prevHeaderHash.flip)
}
} }

View file

@ -1,6 +1,6 @@
package org.bitcoins.core.gcs package org.bitcoins.core.gcs
import org.bitcoins.core.crypto.DoubleSha256Digest import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.number.{UInt64, UInt8} import org.bitcoins.core.number.{UInt64, UInt8}
import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement} import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement}
import org.bitcoins.core.util.CryptoUtil import org.bitcoins.core.util.CryptoUtil
@ -26,6 +26,10 @@ case class GolombFilter(
CryptoUtil.doubleSHA256(this.bytes) CryptoUtil.doubleSHA256(this.bytes)
} }
lazy val hashBE: DoubleSha256DigestBE = {
hash.flip
}
/** Given the previous FilterHeader, constructs the header corresponding to this */ /** Given the previous FilterHeader, constructs the header corresponding to this */
def getHeader(prevHeader: FilterHeader): FilterHeader = { def getHeader(prevHeader: FilterHeader): FilterHeader = {
FilterHeader(filterHash = this.hash, prevHeaderHash = prevHeader.hash) FilterHeader(filterHash = this.hash, prevHeaderHash = prevHeader.hash)

View file

@ -43,14 +43,14 @@ val rpcCli = BitcoindRpcClient(bitcoindInstance)
// Next, we need to create a way to monitor the chain: // Next, we need to create a way to monitor the chain:
val getBestBlockHash = ChainTestUtil.bestBlockHashFnRpc(Future.successful(rpcCli)) val getBestBlockHash = SyncUtil.getBestBlockHashFunc(rpcCli)
val getBlockHeader = ChainTestUtil.getBlockHeaderFnRpc(Future.successful(rpcCli)) val getBlockHeader = SyncUtil.getBlockHeaderFunc(rpcCli)
// set a data directory // set a data directory
val datadir = Files.createTempDirectory("bitcoin-s-test") val datadir = Files.createTempDirectory("bitcoin-s-test")
// set the currenet network to regtest // set the current network to regtest
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
val config = ConfigFactory.parseString { val config = ConfigFactory.parseString {
""" """
@ -68,9 +68,11 @@ val blockHeaderDAO = BlockHeaderDAO()
val compactFilterHeaderDAO = CompactFilterHeaderDAO() val compactFilterHeaderDAO = CompactFilterHeaderDAO()
val compactFilterDAO = CompactFilterDAO() val compactFilterDAO = CompactFilterDAO()
// Now, do the actual syncing:
//initialize the chain handler from the database
val chainHandlerF = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO) val chainHandlerF = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO)
// Now, do the actual syncing:
val syncedChainApiF = for { val syncedChainApiF = for {
_ <- chainProjectInitF _ <- chainProjectInitF
handler <- chainHandlerF handler <- chainHandlerF

View file

@ -0,0 +1,133 @@
---
title: Syncing Blockfilters
id: filter-sync
---
The `chain` module has the ability to store [BIP157](https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki) block filters locally. Generally these filters are useful
for doing wallet rescans. The idea is you can generate a list of script pubkeys you are interested in and see if
the block filter matches the scriptPubKey.
As we demonstrated in [chain.md](chain.md) with block headers, you can sync block filters from an external data source
as well. We are going to use bitcoind as an example of an external data source to sync filters against. It is important
that the bitcoind version you are using is >= `v19` as the [`getblockfilter`](https://github.com/bitcoin/bitcoin/blob/master/doc/release-notes/release-notes-0.19.0.1.md#new-rpcs)
rpc is implemented there. You need to make sure bitcoind is started with the `-blockfilterindex` flag. This makes it
so we can query filters.
#### Abstract idea of syncing filters.
Our internal infrastructure depends on one function to be implemented to be able to sync filters.
```scala mdoc:invisible
import org.bitcoins.core.protocol.blockchain._
import org.bitcoins.core.gcs._
import scala.concurrent.Future
import akka.actor.ActorSystem
import org.bitcoins.core.gcs._
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.chain.blockchain._
import org.bitcoins.chain.blockchain.sync._
import org.bitcoins.chain.models._
import org.bitcoins.chain.config._
import org.bitcoins.rpc.client.common._
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.chain._
import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler
import scala.concurrent._
```
```scala mdoc:compile-only
val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = ???
```
With `getFilterFunc` given a `BlockHeader` we can find it's associated `GolombFilter` -- which is our internal repesentation
of a BIP157 block filter.
The basic idea for `FilterSync.syncFilters()` is to look at our current best block header inside of our `ChainApi.getBestBlockHeader()`
and then check what our best block filter's block hash is with `ChainApi.getBestFilterHeader()`. If the blockfilter returned from our internal
data store is NOT associated with our best block header, we attempt to sync our filter headers to catch up to our best block header.
### Syncing block filters against bitcoind
We are going to implement `getFilterFunc` with bitcoind and then sync a few filter headers.
```scala mdoc:compile-only
implicit val system = ActorSystem(s"filter-sync-example")
implicit val ec = system.dispatcher
implicit val chainAppConfig = BitcoinSTestAppConfig.getNeutrinoTestConfig().chainConf
//let's use a helper method to get a v19 bitcoind
//instance and a chainApi
val bitcoindWithChainApiF: Future[BitcoindV19ChainHandler] = {
ChainUnitTest.createBitcoindV19ChainHandler()
}
val bitcoindF = bitcoindWithChainApiF.map(_.bitcoind)
val chainApiF = bitcoindWithChainApiF.map(_.chainHandler)
val filterType = FilterType.Basic
val addressF = bitcoindF.flatMap(_.getNewAddress)
//this is the function that we are going to use to sync
//our internal filters against. We use this function to query
//for each block filter associated with a blockheader
val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = { blockHeader =>
val prevFilterResultF =
bitcoindF.flatMap(_.getBlockFilter(blockHeader.hashBE, filterType))
prevFilterResultF.map { filterResult =>
FilterWithHeaderHash(filterResult.filter, filterResult.header)
}
}
//ok enough setup, let's generate a block that we need to sync the filter for in bitcoind
val block1F = for {
bitcoind <- bitcoindF
address <- addressF
hashes <- bitcoind.generateToAddress(1,address)
} yield hashes
//to be able to sync filters, we need to make sure our block headers are synced first
//so let's sync our block headers to our internal chainstate
val chainApiSyncedHeadersF = for {
bitcoind <- bitcoindF
handler <- chainApiF
getBestBlockHash = SyncUtil.getBestBlockHashFunc(bitcoind)
getBlockHeader = SyncUtil.getBlockHeaderFunc(bitcoind)
syncedChainApiHeaders <- ChainSync.sync(handler, getBlockHeader, getBestBlockHash)
} yield syncedChainApiHeaders
//now that we have synced our 1 block header, we can now sync the 1 block filter
//associated with that header.
val chainApiSyncedFiltersF = for {
syncedHeadersChainApi <- chainApiSyncedHeadersF
syncedFilters <- FilterSync.syncFilters(syncedHeadersChainApi,getFilterFunc)
} yield syncedFilters
//now we should have synced our one filter, let's make sure we have it
val resultF = for {
chainApi <- chainApiSyncedFiltersF
filterHeaderCount <- chainApi.getFilterHeaderCount()
filterCount <- chainApi.getFilterCount()
} yield {
println(s"filterHeaderCount=$filterHeaderCount filterCount=$filterCount")
}
//cleanup
resultF.onComplete { _ =>
for {
c <- bitcoindWithChainApiF
_ <- ChainUnitTest.destroyBitcoindV19ChainApi(c)
_ <- system.terminate()
} yield ()
}
```
Yay! Now we have synced block filters from an external data source. If you want to repeatedly sync you can just call
`FilterSync.syncFilters(syncedFiltersChainApi,getFilterFunc)` every time you would like to sync. Again, you need to ensure
your headers are synced before you can sync filters, so make sure that you are calling `ChainSync.sync()` before syncing
filters.

View file

@ -1,17 +1,13 @@
package org.bitcoins.testkit.chain package org.bitcoins.testkit.chain
import org.bitcoins.chain.models._ import org.bitcoins.chain.models._
import org.bitcoins.core.crypto import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.core.gcs.{BlockFilter, FilterHeader, GolombFilter} import org.bitcoins.core.gcs.{BlockFilter, FilterHeader, GolombFilter}
import org.bitcoins.core.protocol.blockchain.{ import org.bitcoins.core.protocol.blockchain.{
BlockHeader, BlockHeader,
MainNetChainParams, MainNetChainParams,
RegTestNetChainParams RegTestNetChainParams
} }
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import scala.concurrent.{ExecutionContext, Future}
sealed abstract class ChainTestUtil { sealed abstract class ChainTestUtil {
lazy val regTestChainParams: RegTestNetChainParams.type = lazy val regTestChainParams: RegTestNetChainParams.type =
@ -80,20 +76,6 @@ sealed abstract class ChainTestUtil {
lazy val blockHeaderDb566496 = lazy val blockHeaderDb566496 =
BlockHeaderDbHelper.fromBlockHeader(566496, blockHeader566496) BlockHeaderDbHelper.fromBlockHeader(566496, blockHeader566496)
} }
/** Creates a best block header function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */
def bestBlockHashFnRpc(bitcoindF: Future[BitcoindRpcClient])(
implicit ec: ExecutionContext): () => Future[DoubleSha256DigestBE] = {
() =>
bitcoindF.flatMap(_.getBestBlockHash)
}
/** Creates a getBlocKHeader function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */
def getBlockHeaderFnRpc(bitcoindF: Future[BitcoindRpcClient])(
implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[
BlockHeader] = { hash: crypto.DoubleSha256DigestBE =>
bitcoindF.flatMap(_.getBlockHeader(hash).map(_.blockHeader))
}
} }
object ChainTestUtil extends ChainTestUtil object ChainTestUtil extends ChainTestUtil

View file

@ -5,13 +5,17 @@ import java.net.InetSocketAddress
import akka.actor.ActorSystem import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.bitcoins.chain.ChainVerificationLogger import org.bitcoins.chain.ChainVerificationLogger
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.blockchain.sync.ChainSync
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.db.ChainDbManagement import org.bitcoins.chain.db.ChainDbManagement
import org.bitcoins.chain.models._ import org.bitcoins.chain.models._
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.db.AppConfig import org.bitcoins.db.AppConfig
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.testkit.chain.fixture._ import org.bitcoins.testkit.chain.fixture._
import org.bitcoins.testkit.fixtures.BitcoinSFixture import org.bitcoins.testkit.fixtures.BitcoinSFixture
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
@ -211,19 +215,6 @@ trait ChainUnitTest
} yield (chainHandler, zmqSubscriber) } yield (chainHandler, zmqSubscriber)
} }
def createChainApiWithBitcoindRpc(
bitcoind: BitcoindRpcClient): Future[BitcoindChainHandlerViaRpc] = {
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
chainHandlerF.map { handler =>
chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler)
}
}
def createBitcoindChainHandlerViaZmq(): Future[BitcoindChainHandlerViaZmq] = { def createBitcoindChainHandlerViaZmq(): Future[BitcoindChainHandlerViaZmq] = {
composeBuildersAndWrap(() => BitcoinSFixture.createBitcoind(), composeBuildersAndWrap(() => BitcoinSFixture.createBitcoind(),
createChainHandlerWithBitcoindZmq, createChainHandlerWithBitcoindZmq,
@ -238,19 +229,11 @@ trait ChainUnitTest
bitcoindChainHandler.bitcoindRpc, bitcoindChainHandler.bitcoindRpc,
bitcoindChainHandler.chainHandler) bitcoindChainHandler.chainHandler)
destroyBitcoindChainApiViaRpc(rpc).map { _ => ChainUnitTest.destroyBitcoindChainApiViaRpc(rpc).map { _ =>
bitcoindChainHandler.zmqSubscriber.stop bitcoindChainHandler.zmqSubscriber.stop
} }
} }
def destroyBitcoindChainApiViaRpc(
bitcoindChainHandler: BitcoindChainHandlerViaRpc): Future[Unit] = {
val stopBitcoindF =
BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc)
val dropTableF = ChainUnitTest.destroyAllTables()
stopBitcoindF.flatMap(_ => dropTableF)
}
/** /**
* Creates a [[org.bitcoins.rpc.client.common.BitcoindRpcClient BitcoindRpcClient]] that is linked to our [[org.bitcoins.chain.blockchain.ChainHandler ChainHandler]] * Creates a [[org.bitcoins.rpc.client.common.BitcoindRpcClient BitcoindRpcClient]] that is linked to our [[org.bitcoins.chain.blockchain.ChainHandler ChainHandler]]
* via a [[org.bitcoins.zmq.ZMQSubscriber zmq]]. This means messages are passed between bitcoin and our chain handler * via a [[org.bitcoins.zmq.ZMQSubscriber zmq]]. This means messages are passed between bitcoin and our chain handler
@ -273,10 +256,22 @@ trait ChainUnitTest
def withBitcoindChainHandlerViaRpc(test: OneArgAsyncTest)( def withBitcoindChainHandlerViaRpc(test: OneArgAsyncTest)(
implicit system: ActorSystem): FutureOutcome = { implicit system: ActorSystem): FutureOutcome = {
val builder: () => Future[BitcoindChainHandlerViaRpc] = { () => val builder: () => Future[BitcoindChainHandlerViaRpc] = { () =>
BitcoinSFixture.createBitcoind().flatMap(createChainApiWithBitcoindRpc) BitcoinSFixture
.createBitcoind()
.flatMap(ChainUnitTest.createChainApiWithBitcoindRpc)
} }
makeDependentFixture(builder, destroyBitcoindChainApiViaRpc)(test) makeDependentFixture(builder, ChainUnitTest.destroyBitcoindChainApiViaRpc)(
test)
}
def withBitcoindV19ChainHandlerViaRpc(test: OneArgAsyncTest)(
implicit system: ActorSystem): FutureOutcome = {
val builder: () => Future[BitcoindV19ChainHandler] = { () =>
ChainUnitTest.createBitcoindV19ChainHandler()
}
makeDependentFixture(builder, ChainUnitTest.destroyBitcoindV19ChainApi)(
test)
} }
} }
@ -406,6 +401,54 @@ object ChainUnitTest extends ChainVerificationLogger {
} }
} }
def createChainApiWithBitcoindRpc(bitcoind: BitcoindRpcClient)(
implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[BitcoindChainHandlerViaRpc] = {
val handlerWithGenesisHeaderF =
ChainUnitTest.setupHeaderTableWithGenesisHeader()
val chainHandlerF = handlerWithGenesisHeaderF.map(_._1)
chainHandlerF.map { handler =>
chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler)
}
}
def destroyBitcoindChainApiViaRpc(
bitcoindChainHandler: BitcoindChainHandlerViaRpc)(
implicit system: ActorSystem,
chainAppConfig: ChainAppConfig): Future[Unit] = {
import system.dispatcher
val stopBitcoindF =
BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc)
val dropTableF = ChainUnitTest.destroyAllTables()
stopBitcoindF.flatMap(_ => dropTableF)
}
def createBitcoindV19ChainHandler()(
implicit system: ActorSystem,
chainAppConfig: ChainAppConfig): Future[BitcoindV19ChainHandler] = {
import system.dispatcher
val bitcoindV = BitcoindVersion.V19
BitcoinSFixture
.createBitcoind(Some(bitcoindV))
.flatMap(createChainApiWithBitcoindRpc)
.map { b: BitcoindChainHandlerViaRpc =>
BitcoindV19ChainHandler(
b.bitcoindRpc.asInstanceOf[BitcoindV19RpcClient],
b.chainHandler)
}
}
def destroyBitcoindV19ChainApi(
bitcoindV19ChainHandler: BitcoindV19ChainHandler)(
implicit system: ActorSystem,
chainAppConfig: ChainAppConfig): Future[Unit] = {
val b = BitcoindChainHandlerViaRpc(bitcoindV19ChainHandler.bitcoind,
bitcoindV19ChainHandler.chainHandler)
destroyBitcoindChainApiViaRpc(b)
}
def destroyBitcoind(bitcoind: BitcoindRpcClient)( def destroyBitcoind(bitcoind: BitcoindRpcClient)(
implicit system: ActorSystem): Future[Unit] = { implicit system: ActorSystem): Future[Unit] = {
BitcoindRpcTestUtil.stopServer(bitcoind) BitcoindRpcTestUtil.stopServer(bitcoind)
@ -465,4 +508,20 @@ object ChainUnitTest extends ChainVerificationLogger {
} }
/** Syncs the given chain handler to the given bitcoind */
def syncFromBitcoind(bitcoind: BitcoindRpcClient, chainHandler: ChainHandler)(
implicit ec: ExecutionContext,
chainAppConfig: ChainAppConfig): Future[ChainApi] = {
//sync headers
//first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions
val getBestBlockHashFunc = { () =>
bitcoind.getBestBlockHash
}
val getBlockHeaderFunc = { hash: DoubleSha256DigestBE =>
bitcoind.getBlockHeader(hash).map(_.blockHeader)
}
ChainSync.sync(chainHandler, getBlockHeaderFunc, getBestBlockHashFunc)
}
} }

View file

@ -0,0 +1,43 @@
package org.bitcoins.testkit.chain
import org.bitcoins.chain.blockchain.sync.FilterWithHeaderHash
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.core.gcs.{FilterType, GolombFilter}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.rpc.jsonmodels.GetBlockFilterResult
import scala.concurrent.{ExecutionContext, Future}
/** Useful utilities to use in the chain project for syncing things against bitcoind */
abstract class SyncUtil {
/** Creates a function that will retrun bitcoin's best block hash when called */
def getBestBlockHashFunc(
bitcoind: BitcoindRpcClient): () => Future[DoubleSha256DigestBE] = { () =>
bitcoind.getBestBlockHash
}
/** Creates a function that you can pass a hash to and it returns the block header */
def getBlockHeaderFunc(bitcoind: BitcoindRpcClient)(
implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[
BlockHeader] = { hash: DoubleSha256DigestBE =>
bitcoind.getBlockHeader(hash).map(_.blockHeader)
}
/** Creates a function that you can pass a block header to and it's return's it's [[GolombFilter]] */
def getFilterFunc(bitcoind: BitcoindV19RpcClient, filterType: FilterType)(
implicit ec: ExecutionContext): BlockHeader => Future[
FilterWithHeaderHash] = {
case header: BlockHeader =>
val prevFilterResultF =
bitcoind.getBlockFilter(header.hashBE, filterType)
prevFilterResultF.map {
case GetBlockFilterResult(filter, header) =>
FilterWithHeaderHash(filter, header)
}
}
}
object SyncUtil extends SyncUtil

View file

@ -3,7 +3,7 @@ package org.bitcoins.testkit.chain.fixture
import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
/** Represents a bitcoind instance paired with a chain handler via zmq */ /** Represents a bitcoind instance paired with a chain handler via rpc */
case class BitcoindChainHandlerViaRpc( case class BitcoindChainHandlerViaRpc(
bitcoindRpc: BitcoindRpcClient, bitcoindRpc: BitcoindRpcClient,
chainHandler: ChainHandler) chainHandler: ChainHandler)

View file

@ -150,10 +150,11 @@ object BitcoinSFixture {
/** Creates a new bitcoind instance */ /** Creates a new bitcoind instance */
def createBitcoind(versionOpt: Option[BitcoindVersion] = None)( def createBitcoind(versionOpt: Option[BitcoindVersion] = None)(
implicit system: ActorSystem): Future[BitcoindRpcClient] = { implicit system: ActorSystem): Future[BitcoindRpcClient] = {
import system.dispatcher
val instance = BitcoindRpcTestUtil.instance(versionOpt = versionOpt) val instance = BitcoindRpcTestUtil.instance(versionOpt = versionOpt)
val bitcoind = BitcoindRpcClient.withActorSystem(instance) val bitcoind = versionOpt match {
case Some(v) => BitcoindRpcClient.fromVersion(v, instance)
bitcoind.start().map(_ => bitcoind) case None => new BitcoindRpcClient(instance)
}
bitcoind.start()
} }
} }

View file

@ -49,7 +49,9 @@ private[wallet] trait RescanHandling extends WalletLogger {
addressBatchSize: Int): Future[Unit] = { addressBatchSize: Int): Future[Unit] = {
for { for {
scriptPubKeys <- generateScriptPubKeys(addressBatchSize) scriptPubKeys <- generateScriptPubKeys(addressBatchSize)
blocks <- matchBlocks(scriptPubKeys, endOpt, startOpt) blocks <- matchBlocks(scriptPubKeys = scriptPubKeys,
endOpt = endOpt,
startOpt = startOpt)
_ <- downloadAndProcessBlocks(blocks) _ <- downloadAndProcessBlocks(blocks)
externalGap <- calcAddressGap(HDChainType.External) externalGap <- calcAddressGap(HDChainType.External)
changeGap <- calcAddressGap(HDChainType.Change) changeGap <- calcAddressGap(HDChainType.Change)
@ -119,8 +121,10 @@ private[wallet] trait RescanHandling extends WalletLogger {
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2) Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2)
val blocksF = for { val blocksF = for {
blocks <- getMatchingBlocks(scriptPubKeys, startOpt, endOpt)( blocks <- getMatchingBlocks(
ExecutionContext.fromExecutor(threadPool)) scripts = scriptPubKeys,
startOpt = startOpt,
endOpt = endOpt)(ExecutionContext.fromExecutor(threadPool))
} yield { } yield {
blocks.sortBy(_.blockHeight).map(_.blockHash.flip) blocks.sortBy(_.blockHeight).map(_.blockHash.flip)
} }

View file

@ -22,6 +22,7 @@
], ],
"Applications": [ "Applications": [
"applications/chain", "applications/chain",
"applications/filter-sync",
"applications/cli", "applications/cli",
"applications/configuration", "applications/configuration",
"applications/dlc", "applications/dlc",