From 4c9a22f1e15aa71cb9b4b02af9725bb27d0a8e26 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Tue, 10 Mar 2020 18:01:14 -0500 Subject: [PATCH] 2020 03 08 filter sync (#1209) * Create FilterSync, which gives us an API inside of the chain project to sync filters with Add another unit test to filter sync Add more unit tests for ChainSync and FilterSync Clean up some docs, remove some extra lines of code Run scalafmt Add filter-sync.md Cleanup some nits Add more information of how FilterSync.syncFilters() works Add 'FilterWithHeaderHash' type so that we can actually validate/verify block headers that are being fed into the chain project Run scalafmt, hide imports in filter-sync.md so code appears cleaner Move implicits out of invisible block as it seems to cause errors Make it so FilterSync processes filters in batches rather than fetching them all at once Fix compile error * Add comment about trust model * Run scalafmt --- .../rpc/client/common/BitcoindRpcClient.scala | 20 ++ .../chain/blockchain/sync/ChainSyncTest.scala | 60 ++++- .../blockchain/sync/FilterSyncTest.scala | 127 ++++++++++ .../org/bitcoins/chain/api/ChainApi.scala | 11 + .../chain/blockchain/ChainHandler.scala | 77 +++++- .../chain/blockchain/sync/ChainSync.scala | 14 +- .../chain/blockchain/sync/FilterSync.scala | 224 ++++++++++++++++++ .../sync/FilterWithHeaderHash.scala | 11 + .../chain/models/BlockHeaderDAO.scala | 2 +- .../models/CompactFilterHeaderTable.scala | 6 +- .../chain/models/CompactFilterTable.scala | 7 + .../org/bitcoins/core/api/ChainQueryApi.scala | 1 + .../org/bitcoins/core/gcs/FilterHeader.scala | 15 +- .../org/bitcoins/core/gcs/GolombFilter.scala | 6 +- docs/applications/chain.md | 10 +- docs/applications/filter-sync.md | 133 +++++++++++ .../testkit/chain/ChainTestUtil.scala | 20 +- .../testkit/chain/ChainUnitTest.scala | 109 +++++++-- .../org/bitcoins/testkit/chain/SyncUtil.scala | 43 ++++ .../fixture/BitcoindChainHandlerViaRpc.scala | 2 +- .../testkit/fixtures/BitcoinSFixture.scala | 9 +- .../wallet/internal/RescanHandling.scala | 10 +- website/sidebars.json | 1 + 23 files changed, 832 insertions(+), 86 deletions(-) create mode 100644 chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala create mode 100644 chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala create mode 100644 chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala create mode 100644 docs/applications/filter-sync.md create mode 100644 testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala index 6e9c209981..3b1a7fb1bd 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala @@ -3,6 +3,10 @@ package org.bitcoins.rpc.client.common import java.io.File import akka.actor.ActorSystem +import org.bitcoins.rpc.client.v16.BitcoindV16RpcClient +import org.bitcoins.rpc.client.v17.BitcoindV17RpcClient +import org.bitcoins.rpc.client.v18.BitcoindV18RpcClient +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.rpc.config.{BitcoindConfig, BitcoindInstance} /** @@ -81,6 +85,22 @@ object BitcoindRpcClient { val cli = BitcoindRpcClient(instance) cli } + + /** Returns a bitcoind with the appropriated version you passed in, the bitcoind is NOT started. */ + def fromVersion(version: BitcoindVersion, instance: BitcoindInstance)( + implicit system: ActorSystem): BitcoindRpcClient = { + val bitcoind = version match { + case BitcoindVersion.V16 => BitcoindV16RpcClient.withActorSystem(instance) + case BitcoindVersion.V17 => BitcoindV17RpcClient.withActorSystem(instance) + case BitcoindVersion.V18 => BitcoindV18RpcClient.withActorSystem(instance) + case BitcoindVersion.V19 => BitcoindV19RpcClient.withActorSystem(instance) + case BitcoindVersion.Experimental | BitcoindVersion.Unknown => + sys.error( + s"Cannot create a bitcoind from a unknown or experimental version") + } + + bitcoind + } } sealed trait BitcoindVersion diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala index 14c6e59130..208cd34ae4 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala @@ -2,8 +2,9 @@ package org.bitcoins.chain.blockchain.sync import akka.actor.ActorSystem import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.core.crypto.DoubleSha256DigestBE -import org.bitcoins.testkit.chain.ChainUnitTest +import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil} import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaRpc import org.scalatest.FutureOutcome @@ -26,13 +27,9 @@ class ChainSyncTest extends ChainUnitTest { val bitcoind = bitcoindWithChainHandler.bitcoindRpc val chainHandler = bitcoindWithChainHandler.chainHandler //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions - val getBestBlockHashFunc = { () => - bitcoind.getBestBlockHash - } + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) - val getBlockHeaderFunc = { hash: DoubleSha256DigestBE => - bitcoind.getBlockHeader(hash).map(_.blockHeader) - } + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) //let's generate a block on bitcoind val block1F = @@ -72,4 +69,53 @@ class ChainSyncTest extends ChainUnitTest { chainHandler.getBlockCount.map(count => assert(count == 0)) } } + + it must "be able to call sync() twice and not fail when nothing has happened" in { + bitcoindWithChainHandler => + val bitcoind = bitcoindWithChainHandler.bitcoindRpc + val chainHandler = bitcoindWithChainHandler.chainHandler + //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) + + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) + + val generate1F = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val sync1F: Future[ChainApi] = generate1F.flatMap { _ => + ChainSync.sync(chainHandler = chainHandler, + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = getBestBlockHashFunc) + } + + val assertion1F = for { + hashes <- generate1F + chainApiSync1 <- sync1F + count <- chainApiSync1.getBlockCount() + bestHash <- chainApiSync1.getBestBlockHash() + } yield { + assert(count == 1) + assert(bestHash == hashes.head) + } + + //let's call sync again and make sure nothing bad happens + val sync2F = for { + _ <- assertion1F + chainApiSync1 <- sync1F + chainApiSync2 <- ChainSync.sync( + chainHandler = chainApiSync1.asInstanceOf[ChainHandler], + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = getBestBlockHashFunc) + count <- chainApiSync2.getBlockCount() + hashes <- generate1F + bestHash <- chainApiSync2.getBestBlockHash() + } yield { + assert(count == 1) + assert(bestHash == hashes.head) + } + + sync2F + } } diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala new file mode 100644 index 0000000000..094d5bc28b --- /dev/null +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala @@ -0,0 +1,127 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.core.gcs.{FilterType, GolombFilter} +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler +import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil} +import org.scalatest.FutureOutcome + +import scala.concurrent.Future + +class FilterSyncTest extends ChainUnitTest { + + override type FixtureParam = BitcoindV19ChainHandler + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + withBitcoindV19ChainHandlerViaRpc(test) + } + + behavior of "FilterSync" + + it must "sync 1 filter header from an external data source" in { fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val initFilterCountF = chainHandler.getFilterCount + val initFilterHeaderCountF = chainHandler.getFilterHeaderCount + val initAssertionsF = for { + initFilterCount <- initFilterCountF + initFilterHeaderCount <- initFilterHeaderCountF + } yield { + assert(initFilterCount == 0) + assert(initFilterHeaderCount == 0) + } + + val generated1BlockF = for { + _ <- initAssertionsF + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val syncedF = generated1BlockF.flatMap { _ => + syncHelper(fixture) + } + + for { + syncedChainApi <- syncedF + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == 1) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == 1) + } + + it must "sync a bunch of filter headers from an external data source" in { + fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val numBlocks = 100 + val generatedBlocksF = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(numBlocks, addr) + } yield hashes + + val syncedF = generatedBlocksF.flatMap { _ => + syncHelper(fixture) + } + + for { + syncedChainApi <- syncedF + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == numBlocks) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == numBlocks) + } + + it must "be able to call filterSync() and not fail when nothing has happened" in { + fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val generated1BlockF = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val synced1F = generated1BlockF.flatMap { _ => + syncHelper(fixture) + } + + val sync2F = synced1F.flatMap { chainApi => + syncHelper( + fixture.copy(chainHandler = chainApi.asInstanceOf[ChainHandler])) + } + + for { + syncedChainApi <- sync2F + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == 1) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == 1) + } + + private def syncHelper( + bitcoindV19ChainHandler: BitcoindV19ChainHandler): Future[ChainApi] = { + val filterType = FilterType.Basic + val BitcoindV19ChainHandler(bitcoind, chainHandler) = + bitcoindV19ChainHandler + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) + + val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = + SyncUtil.getFilterFunc(bitcoind, filterType) + + //first sync the chain + val syncedHeadersF = ChainSync.sync(chainHandler = chainHandler, + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = + getBestBlockHashFunc) + + //now sync filters + syncedHeadersF.flatMap { syncedChainHandler => + FilterSync.syncFilters( + chainApi = syncedChainHandler, + getFilterFunc = getFilterFunc + ) + } + } +} diff --git a/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala b/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala index 191b6ea060..4636359d20 100644 --- a/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala +++ b/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala @@ -120,6 +120,12 @@ trait ChainApi extends ChainQueryApi { def getFilterHeadersAtHeight( height: Int): Future[Vector[CompactFilterHeaderDb]] + /** Finds the "best" filter header we have stored in our database + * What this means in practice is the latest filter header we + * have received from our peer. + * */ + def getBestFilterHeader(): Future[CompactFilterHeaderDb] + /** * Looks up a compact filter header by its hash. */ @@ -141,4 +147,9 @@ trait ChainApi extends ChainQueryApi { /** Returns the block height of the given block stamp */ def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] + + /** Fetchs the block headers between from (exclusive) and to (inclusive) */ + def getHeadersBetween( + from: BlockHeaderDb, + to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 78a6d13807..c48dd91693 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -193,9 +193,9 @@ case class ChainHandler( filterHeaders: Vector[FilterHeader], stopHash: DoubleSha256DigestBE): Future[ChainApi] = { - val filterHeadersToCreateF = for { + val filterHeadersToCreateF: Future[Vector[CompactFilterHeaderDb]] = for { blockHeaders <- blockHeaderDAO - .getNChildren(stopHash, filterHeaders.size - 1) + .getNChildren(ancestorHash = stopHash, n = filterHeaders.size - 1) .map(_.sortBy(_.height)) } yield { if (blockHeaders.size != filterHeaders.size) { @@ -214,17 +214,24 @@ case class ChainHandler( for { filterHeadersToCreate <- filterHeadersToCreateF _ <- if (filterHeadersToCreate.nonEmpty && filterHeadersToCreate.head.height > 0) { - filterHeaderDAO - .findByHash(filterHeadersToCreate.head.previousFilterHeaderBE) - .map { prevHeaderOpt => + val firstFilter = filterHeadersToCreate.head + val filterHashFOpt = filterHeaderDAO + .findByHash(firstFilter.previousFilterHeaderBE) + filterHashFOpt.map { + case Some(prevHeader) => require( - prevHeaderOpt.nonEmpty, - s"Previous filter header does not exist: ${filterHeadersToCreate.head.previousFilterHeaderBE}") - require( - prevHeaderOpt.get.height == filterHeadersToCreate.head.height - 1, - s"Unexpected previous header's height: ${prevHeaderOpt.get.height} != ${filterHeadersToCreate.head.height - 1}" + prevHeader.height == firstFilter.height - 1, + s"Unexpected previous header's height: ${prevHeader.height} != ${filterHeadersToCreate.head.height - 1}" ) - } + case None => + if (firstFilter.previousFilterHeaderBE == DoubleSha256DigestBE.empty && firstFilter.height == 0) { + //we are ok, according to BIP157 the previous the genesis filter's prev hash should + //be the empty hash + () + } else { + sys.error(s"Previous filter header does not exist: $firstFilter") + } + } } else FutureUtil.unit _ <- filterHeaderDAO.createAll(filterHeadersToCreate) } yield this @@ -347,6 +354,32 @@ case class ChainHandler( height: Int): Future[Vector[CompactFilterHeaderDb]] = filterHeaderDAO.getAtHeight(height) + /** @inheritdoc */ + override def getBestFilterHeader(): Future[CompactFilterHeaderDb] = { + //this seems realy brittle, is there a guarantee + //that the highest filter header count is our best filter header? + val filterCountF = getFilterHeaderCount() + val ourBestFilterHeader = for { + count <- filterCountF + filterHeader <- getFilterHeadersAtHeight(count) + } yield { + //TODO: Figure out what the best way to select + //the best filter header is if we have competing + //chains. (Same thing applies to getBestBlockHash() + //for now, just do the dumb thing and pick the first one + filterHeader match { + case tip1 +: _ +: _ => + tip1 + case tip +: _ => + tip + case Vector() => + sys.error(s"No filter headers found in database!") + } + } + + ourBestFilterHeader + } + /** @inheritdoc */ override def getFilterHeader( blockHash: DoubleSha256DigestBE): Future[Option[CompactFilterHeaderDb]] = @@ -410,6 +443,28 @@ case class ChainHandler( .map(dbos => dbos.map(dbo => FilterResponse(dbo.golombFilter, dbo.blockHashBE, dbo.height))) + + /** @inheritdoc */ + override def getHeadersBetween( + from: BlockHeaderDb, + to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = { + logger.info(s"Finding headers from=$from to=$to") + def loop( + currentF: Future[BlockHeaderDb], + accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = { + currentF.flatMap { current => + if (current.previousBlockHashBE == from.hashBE) { + Future.successful(current +: accum) + } else { + val nextOptF = getHeader(current.previousBlockHashBE) + val nextF = nextOptF.map(_.getOrElse( + sys.error(s"Could not find header=${current.previousBlockHashBE}"))) + loop(nextF, current +: accum) + } + } + } + loop(Future.successful(to), Vector.empty) + } } object ChainHandler { diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala index 5832d6a82d..640cf99dac 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala @@ -1,16 +1,16 @@ package org.bitcoins.chain.blockchain.sync +import org.bitcoins.chain.ChainVerificationLogger import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.models.BlockHeaderDb import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.protocol.blockchain.BlockHeader import scala.concurrent.{ExecutionContext, Future} -import org.bitcoins.chain.config.ChainAppConfig -import org.bitcoins.chain.ChainVerificationLogger -trait ChainSync extends ChainVerificationLogger { +abstract class ChainSync extends ChainVerificationLogger { /** This method checks if our chain handler has the tip of the blockchain as an external source * If we do not have the same chain, we sync our chain handler until we are at the same best block hash @@ -74,12 +74,11 @@ trait ChainSync extends ChainVerificationLogger { require(tips.nonEmpty, s"Cannot sync without the genesis block") //we need to walk backwards on the chain until we get to one of our tips - val tipsBH = tips.map(_.blockHeader) def loop( lastHeaderF: Future[BlockHeader], - accum: List[BlockHeader]): Future[List[BlockHeader]] = { + accum: Vector[BlockHeader]): Future[Vector[BlockHeader]] = { lastHeaderF.flatMap { lastHeader => if (tipsBH.contains(lastHeader)) { //means we have synced back to a block that we know @@ -117,18 +116,17 @@ trait ChainSync extends ChainVerificationLogger { } else { //this represents all headers we have received from our external data source //and need to process with our chain handler - val headersToSyncF = loop(bestHeaderF, List.empty) + val headersToSyncF = loop(bestHeaderF, Vector.empty) //now we are going to add them to our chain and return the chain api headersToSyncF.flatMap { headers => logger.info( s"Attempting to sync ${headers.length} blockheader to our chainstate") - chainApi.processHeaders(headers.toVector) + chainApi.processHeaders(headers) } } } - } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala new file mode 100644 index 0000000000..5e02e5e40f --- /dev/null +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala @@ -0,0 +1,224 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.chain.ChainVerificationLogger +import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models.{BlockHeaderDb, CompactFilterHeaderDb} +import org.bitcoins.core.gcs.{FilterHeader, GolombFilter} +import org.bitcoins.core.p2p.CompactFilterMessage +import org.bitcoins.core.protocol.blockchain.BlockHeader + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +/** A class that is meant to expose and api to sync + * [[GolombFilter]]s and [[FilterHeader]]s from an external + * data source. The important thing to implement is + * {{{ + * getFilterFunc: BlockHeader => Future[GolombFilter] + * }}} + * which will allow us to sync our internal filters against. + * + * It should be noted you are entirely trusting the provider + * of the `getFilterFunc` as you aren't able to validate the result + * against another peer that as BIP157 specifies + * + * @see [[https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki#client-operation]] + * */ +abstract class FilterSync extends ChainVerificationLogger { + + def syncFilters( + chainApi: ChainApi, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash], + batchSize: Int = 25)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + + val ourBestFilterHeaderF = chainApi.getBestFilterHeader() + val ourBestBlockHeaderF = chainApi.getBestBlockHeader() + for { + ours <- ourBestFilterHeaderF + ourBestBlockHeader <- ourBestBlockHeaderF + syncedChainApi <- syncFiltersToTip(chainApi = chainApi, + ourBestHeader = ourBestBlockHeader, + ourBestFilterHeader = ours, + getFilterFunc = getFilterFunc, + batchSize) + } yield { + syncedChainApi + } + } + + private case class BlockFilterAggregated( + filterHeader: FilterHeader, + filter: GolombFilter, + blockHeader: BlockHeader) + + /** + * Syncs our best filter header to our best block hash + * @param chainApi our current chain state + * @param ourBestHeader the block header we are going to sync filters up until + * @param ourBestFilterHeader the best filter header we have + * @param getFilterFunc given a block hash it retrieves filter associated with that hash from our external source + * @param ec + * @return + */ + private def syncFiltersToTip( + chainApi: ChainApi, + ourBestHeader: BlockHeaderDb, + ourBestFilterHeader: CompactFilterHeaderDb, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash], + batchSize: Int)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + if (ourBestFilterHeader.blockHashBE == ourBestHeader.hashBE) { + logger.info( + s"Our filters are synced with our peers filters, both at blockHash=${ourBestFilterHeader.blockHashBE}") + Future.successful(chainApi) + } else { + logger.info( + s"Beginning sync for filters from filterheader=${ourBestFilterHeader} to blockheader=${ourBestHeader.hashBE}") + //let's fetch all missing filter headers first + val bestFilterBlockHeaderF = + chainApi.getHeader(ourBestFilterHeader.blockHashBE) + + val headersMissingFiltersF = for { + bestFilterBlockHeader <- bestFilterBlockHeaderF + missing <- chainApi.getHeadersBetween(from = bestFilterBlockHeader.get, + to = ourBestHeader) + } yield { + missing + } + + //because filters can be really large, we don't want to process too many + //at once, so batch them in groups and the process them. + val groupedHeadersF: Future[Iterator[Vector[BlockHeaderDb]]] = for { + missing <- headersMissingFiltersF + } yield missing.grouped(batchSize) + + val init = Future.successful(chainApi) + for { + groupedHeaders <- groupedHeadersF + finalChainApi <- { + groupedHeaders.foldLeft(init) { + case (apiF, missingHeaders) => + for { + api <- apiF + bestFilter <- api.getBestFilterHeader() + newApi <- fetchFiltersForHeaderGroup(api, + missingHeaders, + bestFilter, + getFilterFunc) + } yield newApi + } + } + } yield finalChainApi + } + } + + private def fetchFiltersForHeaderGroup( + chainApi: ChainApi, + missingHeaders: Vector[BlockHeaderDb], + ourBestFilterHeader: CompactFilterHeaderDb, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash])( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + //now that we have headers that are missing filters, let's fetch the filters + + val fetchNested = missingHeaders.map { b => + val filterF = getFilterFunc(b.blockHeader) + filterF.map(f => (b, f)) + } + + val fetchFiltersF: Future[Vector[(BlockHeaderDb, FilterWithHeaderHash)]] = { + Future.sequence(fetchNested) + } + + //now let's build filter headers + val blockFiltersAggF: Future[Vector[BlockFilterAggregated]] = { + fetchFiltersF.map { + case filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)] => + buildBlockFilterAggregated(filters, ourBestFilterHeader) + } + } + + val compactFiltersF = blockFiltersAggF.map { filtersAgg => + filtersAgg.map { agg => + CompactFilterMessage(blockHash = agg.blockHeader.hash, + filter = agg.filter) + } + } + + val blockHeaderOptF = blockFiltersAggF.map { filtersAgg => + filtersAgg.lastOption.map(_.blockHeader) + } + val filterHeadersF = blockFiltersAggF.map(_.map(_.filterHeader)) + + for { + blockHeaderOpt <- blockHeaderOptF + compactFilters <- compactFiltersF + filterHeaders <- filterHeadersF + filtersChainApi <- { + blockHeaderOpt match { + case None => + logger.info( + s"We did not have a block header to process filter headers with! filterHeaders=${filterHeaders} " + + s"compactFilters=${compactFilters} ourBestFilterHeader=${ourBestFilterHeader}") + Future.successful(chainApi) + case Some(blockHeader) => + for { + headersChainApi <- chainApi.processFilterHeaders( + filterHeaders, + blockHeader.hashBE) + filtersChainApi <- headersChainApi.processFilters(compactFilters) + } yield filtersChainApi + } + } + } yield { + filtersChainApi + } + } + + /** This builds a [[BlockFilterAggregated]] data structure + * and verifies that the filter header hash from an external + * data source matches the hash of the header we generated internally. + * If the hash does not match, someone is likely feeding you a bad header chain. + * */ + private def buildBlockFilterAggregated( + filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)], + ourBestFilterHeader: CompactFilterHeaderDb): Vector[ + BlockFilterAggregated] = { + + val accum = new mutable.ArrayBuffer[BlockFilterAggregated](filters.length) + + filters.foreach { + case (blockHeaderDb, filterWithHash) => + val FilterWithHeaderHash(filter, expectedHeaderHash) = filterWithHash + val filterHeader = if (accum.isEmpty) { + //first header to connect with our internal headers + //that have already been validated + FilterHeader(filterHash = filter.hash, + prevHeaderHash = ourBestFilterHeader.hashBE.flip) + } else { + //get previous filter header's hash + val prevHeaderHash = accum.last.filterHeader.hash + FilterHeader(filterHash = filter.hash, + prevHeaderHash = prevHeaderHash) + } + if (filterHeader.hash == expectedHeaderHash.flip) { + val agg = BlockFilterAggregated(filterHeader, + filter, + blockHeaderDb.blockHeader) + accum.append(agg) + } else { + sys.error( + s"The header we created was different from the expected hash we received " + + s"from an external data source! Something is wrong. Our filterHeader=${filterHeader} expectedHash=$expectedHeaderHash") + } + } + + accum.toVector + } +} + +object FilterSync extends FilterSync diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala new file mode 100644 index 0000000000..36bc21b5b4 --- /dev/null +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala @@ -0,0 +1,11 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.core.gcs.GolombFilter + +/** Represents a [[GolombFilter]] with it's [[org.bitcoins.core.gcs.FilterHeader]] associated with it + * This is needed because bitcoin core's 'getblockfilter' rpc returns things in this structure + * */ +case class FilterWithHeaderHash( + filter: GolombFilter, + headerHash: DoubleSha256DigestBE) diff --git a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala index 9461a23c2b..0bc40373b8 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala @@ -141,7 +141,7 @@ case class BlockHeaderDAO()( } } - /** Gets Block Headers of all childred starting with the given block hash (inclusive), could be out of order */ + /** Gets Block Headers of all children starting with the given block hash (inclusive), could be out of order */ def getNChildren( ancestorHash: DoubleSha256DigestBE, n: Int): Future[Vector[BlockHeaderDb]] = { diff --git a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala index 4305f4f0ad..7995699aa9 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala @@ -14,6 +14,10 @@ case class CompactFilterHeaderDb( def filterHeader: FilterHeader = FilterHeader(filterHashBE.flip, previousFilterHeaderBE.flip) + + override def toString: String = { + s"CompactFilterDb(hashBE=$hashBE,filterHashBE=$filterHashBE,previousFilterHeaderBE=$previousFilterHeaderBE,blockHashBE=$blockHashBE,height=$height)" + } } object CompactFilterHeaderDbHelper { @@ -21,7 +25,7 @@ object CompactFilterHeaderDbHelper { def fromFilterHeader( filterHeader: FilterHeader, blockHash: DoubleSha256DigestBE, - height: Int) = + height: Int): CompactFilterHeaderDb = CompactFilterHeaderDb( hashBE = filterHeader.hash.flip, filterHashBE = filterHeader.filterHash.flip, diff --git a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala index afda8f0afb..2f7844db66 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala @@ -13,10 +13,17 @@ case class CompactFilterDb( bytes: ByteVector, height: Int, blockHashBE: DoubleSha256DigestBE) { + require( + CryptoUtil.doubleSHA256(bytes).flip == hashBE, + s"Bytes must hash to hashBE! It looks like you didn't construct CompactFilterDb correctly") def golombFilter: GolombFilter = filterType match { case FilterType.Basic => BlockFilter.fromBytes(bytes, blockHashBE.flip) } + + override def toString: String = { + s"CompactFilterDb(hashBE=$hashBE,filterType=$filterType,height=$height,blockHashBE=$blockHashBE,bytes=${bytes})" + } } object CompactFilterDbHelper { diff --git a/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala b/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala index 61248dfa95..ede0c15532 100644 --- a/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala @@ -32,6 +32,7 @@ trait ChainQueryApi { def getFiltersBetweenHeights( startHeight: Int, endHeight: Int): Future[Vector[FilterResponse]] + } object ChainQueryApi { diff --git a/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala b/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala index 03c7b37bd7..0493aa72dc 100644 --- a/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala +++ b/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala @@ -1,6 +1,6 @@ package org.bitcoins.core.gcs -import org.bitcoins.core.crypto.DoubleSha256Digest +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.core.util.CryptoUtil /** @@ -25,4 +25,17 @@ case class FilterHeader( def nextHeader(nextFilterHash: DoubleSha256Digest): FilterHeader = { FilterHeader(filterHash = nextFilterHash, prevHeaderHash = this.hash) } + + override def toString: String = { + s"FilterHeader(hashBE=${hash.flip},filterHashBE=${filterHash.flip.hex},prevHeaderHashBE=${prevHeaderHash.flip.hex})" + } +} + +object FilterHeader { + + def apply( + filterHash: DoubleSha256DigestBE, + prevHeaderHash: DoubleSha256DigestBE): FilterHeader = { + new FilterHeader(filterHash.flip, prevHeaderHash.flip) + } } diff --git a/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala b/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala index d797b269fc..d1379d2fde 100644 --- a/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala +++ b/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala @@ -1,6 +1,6 @@ package org.bitcoins.core.gcs -import org.bitcoins.core.crypto.DoubleSha256Digest +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.core.number.{UInt64, UInt8} import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement} import org.bitcoins.core.util.CryptoUtil @@ -26,6 +26,10 @@ case class GolombFilter( CryptoUtil.doubleSHA256(this.bytes) } + lazy val hashBE: DoubleSha256DigestBE = { + hash.flip + } + /** Given the previous FilterHeader, constructs the header corresponding to this */ def getHeader(prevHeader: FilterHeader): FilterHeader = { FilterHeader(filterHash = this.hash, prevHeaderHash = prevHeader.hash) diff --git a/docs/applications/chain.md b/docs/applications/chain.md index 24d3cacc65..30f9a25f52 100644 --- a/docs/applications/chain.md +++ b/docs/applications/chain.md @@ -43,14 +43,14 @@ val rpcCli = BitcoindRpcClient(bitcoindInstance) // Next, we need to create a way to monitor the chain: -val getBestBlockHash = ChainTestUtil.bestBlockHashFnRpc(Future.successful(rpcCli)) +val getBestBlockHash = SyncUtil.getBestBlockHashFunc(rpcCli) -val getBlockHeader = ChainTestUtil.getBlockHeaderFnRpc(Future.successful(rpcCli)) +val getBlockHeader = SyncUtil.getBlockHeaderFunc(rpcCli) // set a data directory val datadir = Files.createTempDirectory("bitcoin-s-test") -// set the currenet network to regtest +// set the current network to regtest import com.typesafe.config.ConfigFactory val config = ConfigFactory.parseString { """ @@ -68,9 +68,11 @@ val blockHeaderDAO = BlockHeaderDAO() val compactFilterHeaderDAO = CompactFilterHeaderDAO() val compactFilterDAO = CompactFilterDAO() -// Now, do the actual syncing: + +//initialize the chain handler from the database val chainHandlerF = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO) +// Now, do the actual syncing: val syncedChainApiF = for { _ <- chainProjectInitF handler <- chainHandlerF diff --git a/docs/applications/filter-sync.md b/docs/applications/filter-sync.md new file mode 100644 index 0000000000..561c571c2e --- /dev/null +++ b/docs/applications/filter-sync.md @@ -0,0 +1,133 @@ +--- +title: Syncing Blockfilters +id: filter-sync +--- + +The `chain` module has the ability to store [BIP157](https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki) block filters locally. Generally these filters are useful +for doing wallet rescans. The idea is you can generate a list of script pubkeys you are interested in and see if +the block filter matches the scriptPubKey. + +As we demonstrated in [chain.md](chain.md) with block headers, you can sync block filters from an external data source +as well. We are going to use bitcoind as an example of an external data source to sync filters against. It is important +that the bitcoind version you are using is >= `v19` as the [`getblockfilter`](https://github.com/bitcoin/bitcoin/blob/master/doc/release-notes/release-notes-0.19.0.1.md#new-rpcs) +rpc is implemented there. You need to make sure bitcoind is started with the `-blockfilterindex` flag. This makes it +so we can query filters. + +#### Abstract idea of syncing filters. + +Our internal infrastructure depends on one function to be implemented to be able to sync filters. + +```scala mdoc:invisible +import org.bitcoins.core.protocol.blockchain._ +import org.bitcoins.core.gcs._ +import scala.concurrent.Future + +import akka.actor.ActorSystem + +import org.bitcoins.core.gcs._ +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.chain.blockchain._ +import org.bitcoins.chain.blockchain.sync._ +import org.bitcoins.chain.models._ +import org.bitcoins.chain.config._ + +import org.bitcoins.rpc.client.common._ +import org.bitcoins.testkit.BitcoinSTestAppConfig +import org.bitcoins.testkit.chain._ +import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler + +import scala.concurrent._ + +``` + +```scala mdoc:compile-only +val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = ??? +``` + +With `getFilterFunc` given a `BlockHeader` we can find it's associated `GolombFilter` -- which is our internal repesentation +of a BIP157 block filter. + +The basic idea for `FilterSync.syncFilters()` is to look at our current best block header inside of our `ChainApi.getBestBlockHeader()` +and then check what our best block filter's block hash is with `ChainApi.getBestFilterHeader()`. If the blockfilter returned from our internal +data store is NOT associated with our best block header, we attempt to sync our filter headers to catch up to our best block header. + +### Syncing block filters against bitcoind + +We are going to implement `getFilterFunc` with bitcoind and then sync a few filter headers. + +```scala mdoc:compile-only + +implicit val system = ActorSystem(s"filter-sync-example") +implicit val ec = system.dispatcher +implicit val chainAppConfig = BitcoinSTestAppConfig.getNeutrinoTestConfig().chainConf + +//let's use a helper method to get a v19 bitcoind +//instance and a chainApi +val bitcoindWithChainApiF: Future[BitcoindV19ChainHandler] = { + ChainUnitTest.createBitcoindV19ChainHandler() +} +val bitcoindF = bitcoindWithChainApiF.map(_.bitcoind) +val chainApiF = bitcoindWithChainApiF.map(_.chainHandler) + +val filterType = FilterType.Basic +val addressF = bitcoindF.flatMap(_.getNewAddress) + +//this is the function that we are going to use to sync +//our internal filters against. We use this function to query +//for each block filter associated with a blockheader +val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = { blockHeader => + val prevFilterResultF = + bitcoindF.flatMap(_.getBlockFilter(blockHeader.hashBE, filterType)) + prevFilterResultF.map { filterResult => + FilterWithHeaderHash(filterResult.filter, filterResult.header) + } +} + +//ok enough setup, let's generate a block that we need to sync the filter for in bitcoind +val block1F = for { + bitcoind <- bitcoindF + address <- addressF + hashes <- bitcoind.generateToAddress(1,address) +} yield hashes + +//to be able to sync filters, we need to make sure our block headers are synced first +//so let's sync our block headers to our internal chainstate +val chainApiSyncedHeadersF = for { + bitcoind <- bitcoindF + handler <- chainApiF + getBestBlockHash = SyncUtil.getBestBlockHashFunc(bitcoind) + getBlockHeader = SyncUtil.getBlockHeaderFunc(bitcoind) + syncedChainApiHeaders <- ChainSync.sync(handler, getBlockHeader, getBestBlockHash) +} yield syncedChainApiHeaders + +//now that we have synced our 1 block header, we can now sync the 1 block filter +//associated with that header. +val chainApiSyncedFiltersF = for { + syncedHeadersChainApi <- chainApiSyncedHeadersF + syncedFilters <- FilterSync.syncFilters(syncedHeadersChainApi,getFilterFunc) +} yield syncedFilters + +//now we should have synced our one filter, let's make sure we have it +val resultF = for { + chainApi <- chainApiSyncedFiltersF + filterHeaderCount <- chainApi.getFilterHeaderCount() + filterCount <- chainApi.getFilterCount() +} yield { + println(s"filterHeaderCount=$filterHeaderCount filterCount=$filterCount") +} + +//cleanup +resultF.onComplete { _ => + for { + c <- bitcoindWithChainApiF + _ <- ChainUnitTest.destroyBitcoindV19ChainApi(c) + _ <- system.terminate() + } yield () +} +``` + +Yay! Now we have synced block filters from an external data source. If you want to repeatedly sync you can just call + +`FilterSync.syncFilters(syncedFiltersChainApi,getFilterFunc)` every time you would like to sync. Again, you need to ensure +your headers are synced before you can sync filters, so make sure that you are calling `ChainSync.sync()` before syncing +filters. \ No newline at end of file diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala index 0db1eb4217..d5d0da8d5c 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala @@ -1,17 +1,13 @@ package org.bitcoins.testkit.chain import org.bitcoins.chain.models._ -import org.bitcoins.core.crypto -import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.core.crypto.DoubleSha256Digest import org.bitcoins.core.gcs.{BlockFilter, FilterHeader, GolombFilter} import org.bitcoins.core.protocol.blockchain.{ BlockHeader, MainNetChainParams, RegTestNetChainParams } -import org.bitcoins.rpc.client.common.BitcoindRpcClient - -import scala.concurrent.{ExecutionContext, Future} sealed abstract class ChainTestUtil { lazy val regTestChainParams: RegTestNetChainParams.type = @@ -80,20 +76,6 @@ sealed abstract class ChainTestUtil { lazy val blockHeaderDb566496 = BlockHeaderDbHelper.fromBlockHeader(566496, blockHeader566496) } - - /** Creates a best block header function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */ - def bestBlockHashFnRpc(bitcoindF: Future[BitcoindRpcClient])( - implicit ec: ExecutionContext): () => Future[DoubleSha256DigestBE] = { - () => - bitcoindF.flatMap(_.getBestBlockHash) - } - - /** Creates a getBlocKHeader function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */ - def getBlockHeaderFnRpc(bitcoindF: Future[BitcoindRpcClient])( - implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[ - BlockHeader] = { hash: crypto.DoubleSha256DigestBE => - bitcoindF.flatMap(_.getBlockHeader(hash).map(_.blockHeader)) - } } object ChainTestUtil extends ChainTestUtil diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala index 812a2b9773..e2c64b00c9 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala @@ -5,13 +5,17 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import org.bitcoins.chain.ChainVerificationLogger +import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.blockchain.sync.ChainSync import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.db.ChainDbManagement import org.bitcoins.chain.models._ +import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} import org.bitcoins.db.AppConfig -import org.bitcoins.rpc.client.common.BitcoindRpcClient +import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.testkit.chain.fixture._ import org.bitcoins.testkit.fixtures.BitcoinSFixture import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil @@ -211,19 +215,6 @@ trait ChainUnitTest } yield (chainHandler, zmqSubscriber) } - def createChainApiWithBitcoindRpc( - bitcoind: BitcoindRpcClient): Future[BitcoindChainHandlerViaRpc] = { - val handlerWithGenesisHeaderF = - ChainUnitTest.setupHeaderTableWithGenesisHeader() - - val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) - - chainHandlerF.map { handler => - chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler) - } - - } - def createBitcoindChainHandlerViaZmq(): Future[BitcoindChainHandlerViaZmq] = { composeBuildersAndWrap(() => BitcoinSFixture.createBitcoind(), createChainHandlerWithBitcoindZmq, @@ -238,19 +229,11 @@ trait ChainUnitTest bitcoindChainHandler.bitcoindRpc, bitcoindChainHandler.chainHandler) - destroyBitcoindChainApiViaRpc(rpc).map { _ => + ChainUnitTest.destroyBitcoindChainApiViaRpc(rpc).map { _ => bitcoindChainHandler.zmqSubscriber.stop } } - def destroyBitcoindChainApiViaRpc( - bitcoindChainHandler: BitcoindChainHandlerViaRpc): Future[Unit] = { - val stopBitcoindF = - BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc) - val dropTableF = ChainUnitTest.destroyAllTables() - stopBitcoindF.flatMap(_ => dropTableF) - } - /** * Creates a [[org.bitcoins.rpc.client.common.BitcoindRpcClient BitcoindRpcClient]] that is linked to our [[org.bitcoins.chain.blockchain.ChainHandler ChainHandler]] * via a [[org.bitcoins.zmq.ZMQSubscriber zmq]]. This means messages are passed between bitcoin and our chain handler @@ -273,10 +256,22 @@ trait ChainUnitTest def withBitcoindChainHandlerViaRpc(test: OneArgAsyncTest)( implicit system: ActorSystem): FutureOutcome = { val builder: () => Future[BitcoindChainHandlerViaRpc] = { () => - BitcoinSFixture.createBitcoind().flatMap(createChainApiWithBitcoindRpc) + BitcoinSFixture + .createBitcoind() + .flatMap(ChainUnitTest.createChainApiWithBitcoindRpc) } - makeDependentFixture(builder, destroyBitcoindChainApiViaRpc)(test) + makeDependentFixture(builder, ChainUnitTest.destroyBitcoindChainApiViaRpc)( + test) + } + + def withBitcoindV19ChainHandlerViaRpc(test: OneArgAsyncTest)( + implicit system: ActorSystem): FutureOutcome = { + val builder: () => Future[BitcoindV19ChainHandler] = { () => + ChainUnitTest.createBitcoindV19ChainHandler() + } + makeDependentFixture(builder, ChainUnitTest.destroyBitcoindV19ChainApi)( + test) } } @@ -406,6 +401,54 @@ object ChainUnitTest extends ChainVerificationLogger { } } + def createChainApiWithBitcoindRpc(bitcoind: BitcoindRpcClient)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[BitcoindChainHandlerViaRpc] = { + val handlerWithGenesisHeaderF = + ChainUnitTest.setupHeaderTableWithGenesisHeader() + + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + + chainHandlerF.map { handler => + chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler) + } + } + + def destroyBitcoindChainApiViaRpc( + bitcoindChainHandler: BitcoindChainHandlerViaRpc)( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[Unit] = { + import system.dispatcher + val stopBitcoindF = + BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc) + val dropTableF = ChainUnitTest.destroyAllTables() + stopBitcoindF.flatMap(_ => dropTableF) + } + + def createBitcoindV19ChainHandler()( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[BitcoindV19ChainHandler] = { + import system.dispatcher + val bitcoindV = BitcoindVersion.V19 + BitcoinSFixture + .createBitcoind(Some(bitcoindV)) + .flatMap(createChainApiWithBitcoindRpc) + .map { b: BitcoindChainHandlerViaRpc => + BitcoindV19ChainHandler( + b.bitcoindRpc.asInstanceOf[BitcoindV19RpcClient], + b.chainHandler) + } + } + + def destroyBitcoindV19ChainApi( + bitcoindV19ChainHandler: BitcoindV19ChainHandler)( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[Unit] = { + val b = BitcoindChainHandlerViaRpc(bitcoindV19ChainHandler.bitcoind, + bitcoindV19ChainHandler.chainHandler) + destroyBitcoindChainApiViaRpc(b) + } + def destroyBitcoind(bitcoind: BitcoindRpcClient)( implicit system: ActorSystem): Future[Unit] = { BitcoindRpcTestUtil.stopServer(bitcoind) @@ -465,4 +508,20 @@ object ChainUnitTest extends ChainVerificationLogger { } + /** Syncs the given chain handler to the given bitcoind */ + def syncFromBitcoind(bitcoind: BitcoindRpcClient, chainHandler: ChainHandler)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + //sync headers + //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions + val getBestBlockHashFunc = { () => + bitcoind.getBestBlockHash + } + + val getBlockHeaderFunc = { hash: DoubleSha256DigestBE => + bitcoind.getBlockHeader(hash).map(_.blockHeader) + } + + ChainSync.sync(chainHandler, getBlockHeaderFunc, getBestBlockHashFunc) + } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala new file mode 100644 index 0000000000..86397e7e59 --- /dev/null +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala @@ -0,0 +1,43 @@ +package org.bitcoins.testkit.chain + +import org.bitcoins.chain.blockchain.sync.FilterWithHeaderHash +import org.bitcoins.core.crypto.DoubleSha256DigestBE +import org.bitcoins.core.gcs.{FilterType, GolombFilter} +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.rpc.client.common.BitcoindRpcClient +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient +import org.bitcoins.rpc.jsonmodels.GetBlockFilterResult + +import scala.concurrent.{ExecutionContext, Future} + +/** Useful utilities to use in the chain project for syncing things against bitcoind */ +abstract class SyncUtil { + + /** Creates a function that will retrun bitcoin's best block hash when called */ + def getBestBlockHashFunc( + bitcoind: BitcoindRpcClient): () => Future[DoubleSha256DigestBE] = { () => + bitcoind.getBestBlockHash + } + + /** Creates a function that you can pass a hash to and it returns the block header */ + def getBlockHeaderFunc(bitcoind: BitcoindRpcClient)( + implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[ + BlockHeader] = { hash: DoubleSha256DigestBE => + bitcoind.getBlockHeader(hash).map(_.blockHeader) + } + + /** Creates a function that you can pass a block header to and it's return's it's [[GolombFilter]] */ + def getFilterFunc(bitcoind: BitcoindV19RpcClient, filterType: FilterType)( + implicit ec: ExecutionContext): BlockHeader => Future[ + FilterWithHeaderHash] = { + case header: BlockHeader => + val prevFilterResultF = + bitcoind.getBlockFilter(header.hashBE, filterType) + prevFilterResultF.map { + case GetBlockFilterResult(filter, header) => + FilterWithHeaderHash(filter, header) + } + } +} + +object SyncUtil extends SyncUtil diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala index 3c69cdb5db..fc45d7212b 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala @@ -3,7 +3,7 @@ package org.bitcoins.testkit.chain.fixture import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.rpc.client.common.BitcoindRpcClient -/** Represents a bitcoind instance paired with a chain handler via zmq */ +/** Represents a bitcoind instance paired with a chain handler via rpc */ case class BitcoindChainHandlerViaRpc( bitcoindRpc: BitcoindRpcClient, chainHandler: ChainHandler) diff --git a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala index 3e16291e0c..1c7be9df9a 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala @@ -150,10 +150,11 @@ object BitcoinSFixture { /** Creates a new bitcoind instance */ def createBitcoind(versionOpt: Option[BitcoindVersion] = None)( implicit system: ActorSystem): Future[BitcoindRpcClient] = { - import system.dispatcher val instance = BitcoindRpcTestUtil.instance(versionOpt = versionOpt) - val bitcoind = BitcoindRpcClient.withActorSystem(instance) - - bitcoind.start().map(_ => bitcoind) + val bitcoind = versionOpt match { + case Some(v) => BitcoindRpcClient.fromVersion(v, instance) + case None => new BitcoindRpcClient(instance) + } + bitcoind.start() } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index 3a324b4e78..48402ab21f 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -49,7 +49,9 @@ private[wallet] trait RescanHandling extends WalletLogger { addressBatchSize: Int): Future[Unit] = { for { scriptPubKeys <- generateScriptPubKeys(addressBatchSize) - blocks <- matchBlocks(scriptPubKeys, endOpt, startOpt) + blocks <- matchBlocks(scriptPubKeys = scriptPubKeys, + endOpt = endOpt, + startOpt = startOpt) _ <- downloadAndProcessBlocks(blocks) externalGap <- calcAddressGap(HDChainType.External) changeGap <- calcAddressGap(HDChainType.Change) @@ -119,8 +121,10 @@ private[wallet] trait RescanHandling extends WalletLogger { Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2) val blocksF = for { - blocks <- getMatchingBlocks(scriptPubKeys, startOpt, endOpt)( - ExecutionContext.fromExecutor(threadPool)) + blocks <- getMatchingBlocks( + scripts = scriptPubKeys, + startOpt = startOpt, + endOpt = endOpt)(ExecutionContext.fromExecutor(threadPool)) } yield { blocks.sortBy(_.blockHeight).map(_.blockHash.flip) } diff --git a/website/sidebars.json b/website/sidebars.json index dfb4c59b6a..d9f8eb0abb 100644 --- a/website/sidebars.json +++ b/website/sidebars.json @@ -22,6 +22,7 @@ ], "Applications": [ "applications/chain", + "applications/filter-sync", "applications/cli", "applications/configuration", "applications/dlc",