diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala index 6e9c209981..3b1a7fb1bd 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/client/common/BitcoindRpcClient.scala @@ -3,6 +3,10 @@ package org.bitcoins.rpc.client.common import java.io.File import akka.actor.ActorSystem +import org.bitcoins.rpc.client.v16.BitcoindV16RpcClient +import org.bitcoins.rpc.client.v17.BitcoindV17RpcClient +import org.bitcoins.rpc.client.v18.BitcoindV18RpcClient +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.rpc.config.{BitcoindConfig, BitcoindInstance} /** @@ -81,6 +85,22 @@ object BitcoindRpcClient { val cli = BitcoindRpcClient(instance) cli } + + /** Returns a bitcoind with the appropriated version you passed in, the bitcoind is NOT started. */ + def fromVersion(version: BitcoindVersion, instance: BitcoindInstance)( + implicit system: ActorSystem): BitcoindRpcClient = { + val bitcoind = version match { + case BitcoindVersion.V16 => BitcoindV16RpcClient.withActorSystem(instance) + case BitcoindVersion.V17 => BitcoindV17RpcClient.withActorSystem(instance) + case BitcoindVersion.V18 => BitcoindV18RpcClient.withActorSystem(instance) + case BitcoindVersion.V19 => BitcoindV19RpcClient.withActorSystem(instance) + case BitcoindVersion.Experimental | BitcoindVersion.Unknown => + sys.error( + s"Cannot create a bitcoind from a unknown or experimental version") + } + + bitcoind + } } sealed trait BitcoindVersion diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala index 14c6e59130..208cd34ae4 100644 --- a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/ChainSyncTest.scala @@ -2,8 +2,9 @@ package org.bitcoins.chain.blockchain.sync import akka.actor.ActorSystem import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.core.crypto.DoubleSha256DigestBE -import org.bitcoins.testkit.chain.ChainUnitTest +import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil} import org.bitcoins.testkit.chain.fixture.BitcoindChainHandlerViaRpc import org.scalatest.FutureOutcome @@ -26,13 +27,9 @@ class ChainSyncTest extends ChainUnitTest { val bitcoind = bitcoindWithChainHandler.bitcoindRpc val chainHandler = bitcoindWithChainHandler.chainHandler //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions - val getBestBlockHashFunc = { () => - bitcoind.getBestBlockHash - } + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) - val getBlockHeaderFunc = { hash: DoubleSha256DigestBE => - bitcoind.getBlockHeader(hash).map(_.blockHeader) - } + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) //let's generate a block on bitcoind val block1F = @@ -72,4 +69,53 @@ class ChainSyncTest extends ChainUnitTest { chainHandler.getBlockCount.map(count => assert(count == 0)) } } + + it must "be able to call sync() twice and not fail when nothing has happened" in { + bitcoindWithChainHandler => + val bitcoind = bitcoindWithChainHandler.bitcoindRpc + val chainHandler = bitcoindWithChainHandler.chainHandler + //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) + + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) + + val generate1F = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val sync1F: Future[ChainApi] = generate1F.flatMap { _ => + ChainSync.sync(chainHandler = chainHandler, + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = getBestBlockHashFunc) + } + + val assertion1F = for { + hashes <- generate1F + chainApiSync1 <- sync1F + count <- chainApiSync1.getBlockCount() + bestHash <- chainApiSync1.getBestBlockHash() + } yield { + assert(count == 1) + assert(bestHash == hashes.head) + } + + //let's call sync again and make sure nothing bad happens + val sync2F = for { + _ <- assertion1F + chainApiSync1 <- sync1F + chainApiSync2 <- ChainSync.sync( + chainHandler = chainApiSync1.asInstanceOf[ChainHandler], + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = getBestBlockHashFunc) + count <- chainApiSync2.getBlockCount() + hashes <- generate1F + bestHash <- chainApiSync2.getBestBlockHash() + } yield { + assert(count == 1) + assert(bestHash == hashes.head) + } + + sync2F + } } diff --git a/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala new file mode 100644 index 0000000000..094d5bc28b --- /dev/null +++ b/chain-test/src/test/scala/org/bitcoins/chain/blockchain/sync/FilterSyncTest.scala @@ -0,0 +1,127 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.core.gcs.{FilterType, GolombFilter} +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler +import org.bitcoins.testkit.chain.{ChainUnitTest, SyncUtil} +import org.scalatest.FutureOutcome + +import scala.concurrent.Future + +class FilterSyncTest extends ChainUnitTest { + + override type FixtureParam = BitcoindV19ChainHandler + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = { + withBitcoindV19ChainHandlerViaRpc(test) + } + + behavior of "FilterSync" + + it must "sync 1 filter header from an external data source" in { fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val initFilterCountF = chainHandler.getFilterCount + val initFilterHeaderCountF = chainHandler.getFilterHeaderCount + val initAssertionsF = for { + initFilterCount <- initFilterCountF + initFilterHeaderCount <- initFilterHeaderCountF + } yield { + assert(initFilterCount == 0) + assert(initFilterHeaderCount == 0) + } + + val generated1BlockF = for { + _ <- initAssertionsF + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val syncedF = generated1BlockF.flatMap { _ => + syncHelper(fixture) + } + + for { + syncedChainApi <- syncedF + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == 1) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == 1) + } + + it must "sync a bunch of filter headers from an external data source" in { + fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val numBlocks = 100 + val generatedBlocksF = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(numBlocks, addr) + } yield hashes + + val syncedF = generatedBlocksF.flatMap { _ => + syncHelper(fixture) + } + + for { + syncedChainApi <- syncedF + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == numBlocks) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == numBlocks) + } + + it must "be able to call filterSync() and not fail when nothing has happened" in { + fixture => + val BitcoindV19ChainHandler(bitcoind, chainHandler) = fixture + + val generated1BlockF = for { + addr <- bitcoind.getNewAddress + hashes <- bitcoind.generateToAddress(1, addr) + } yield hashes + + val synced1F = generated1BlockF.flatMap { _ => + syncHelper(fixture) + } + + val sync2F = synced1F.flatMap { chainApi => + syncHelper( + fixture.copy(chainHandler = chainApi.asInstanceOf[ChainHandler])) + } + + for { + syncedChainApi <- sync2F + filterHeaderCount <- syncedChainApi.getFilterHeaderCount() + _ = assert(filterHeaderCount == 1) + filterCount <- syncedChainApi.getFilterCount() + } yield assert(filterCount == 1) + } + + private def syncHelper( + bitcoindV19ChainHandler: BitcoindV19ChainHandler): Future[ChainApi] = { + val filterType = FilterType.Basic + val BitcoindV19ChainHandler(bitcoind, chainHandler) = + bitcoindV19ChainHandler + val getBestBlockHashFunc = SyncUtil.getBestBlockHashFunc(bitcoind) + val getBlockHeaderFunc = SyncUtil.getBlockHeaderFunc(bitcoind) + + val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = + SyncUtil.getFilterFunc(bitcoind, filterType) + + //first sync the chain + val syncedHeadersF = ChainSync.sync(chainHandler = chainHandler, + getBlockHeaderFunc = getBlockHeaderFunc, + getBestBlockHashFunc = + getBestBlockHashFunc) + + //now sync filters + syncedHeadersF.flatMap { syncedChainHandler => + FilterSync.syncFilters( + chainApi = syncedChainHandler, + getFilterFunc = getFilterFunc + ) + } + } +} diff --git a/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala b/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala index 191b6ea060..4636359d20 100644 --- a/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala +++ b/chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala @@ -120,6 +120,12 @@ trait ChainApi extends ChainQueryApi { def getFilterHeadersAtHeight( height: Int): Future[Vector[CompactFilterHeaderDb]] + /** Finds the "best" filter header we have stored in our database + * What this means in practice is the latest filter header we + * have received from our peer. + * */ + def getBestFilterHeader(): Future[CompactFilterHeaderDb] + /** * Looks up a compact filter header by its hash. */ @@ -141,4 +147,9 @@ trait ChainApi extends ChainQueryApi { /** Returns the block height of the given block stamp */ def getHeightByBlockStamp(blockStamp: BlockStamp): Future[Int] + + /** Fetchs the block headers between from (exclusive) and to (inclusive) */ + def getHeadersBetween( + from: BlockHeaderDb, + to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 78a6d13807..c48dd91693 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -193,9 +193,9 @@ case class ChainHandler( filterHeaders: Vector[FilterHeader], stopHash: DoubleSha256DigestBE): Future[ChainApi] = { - val filterHeadersToCreateF = for { + val filterHeadersToCreateF: Future[Vector[CompactFilterHeaderDb]] = for { blockHeaders <- blockHeaderDAO - .getNChildren(stopHash, filterHeaders.size - 1) + .getNChildren(ancestorHash = stopHash, n = filterHeaders.size - 1) .map(_.sortBy(_.height)) } yield { if (blockHeaders.size != filterHeaders.size) { @@ -214,17 +214,24 @@ case class ChainHandler( for { filterHeadersToCreate <- filterHeadersToCreateF _ <- if (filterHeadersToCreate.nonEmpty && filterHeadersToCreate.head.height > 0) { - filterHeaderDAO - .findByHash(filterHeadersToCreate.head.previousFilterHeaderBE) - .map { prevHeaderOpt => + val firstFilter = filterHeadersToCreate.head + val filterHashFOpt = filterHeaderDAO + .findByHash(firstFilter.previousFilterHeaderBE) + filterHashFOpt.map { + case Some(prevHeader) => require( - prevHeaderOpt.nonEmpty, - s"Previous filter header does not exist: ${filterHeadersToCreate.head.previousFilterHeaderBE}") - require( - prevHeaderOpt.get.height == filterHeadersToCreate.head.height - 1, - s"Unexpected previous header's height: ${prevHeaderOpt.get.height} != ${filterHeadersToCreate.head.height - 1}" + prevHeader.height == firstFilter.height - 1, + s"Unexpected previous header's height: ${prevHeader.height} != ${filterHeadersToCreate.head.height - 1}" ) - } + case None => + if (firstFilter.previousFilterHeaderBE == DoubleSha256DigestBE.empty && firstFilter.height == 0) { + //we are ok, according to BIP157 the previous the genesis filter's prev hash should + //be the empty hash + () + } else { + sys.error(s"Previous filter header does not exist: $firstFilter") + } + } } else FutureUtil.unit _ <- filterHeaderDAO.createAll(filterHeadersToCreate) } yield this @@ -347,6 +354,32 @@ case class ChainHandler( height: Int): Future[Vector[CompactFilterHeaderDb]] = filterHeaderDAO.getAtHeight(height) + /** @inheritdoc */ + override def getBestFilterHeader(): Future[CompactFilterHeaderDb] = { + //this seems realy brittle, is there a guarantee + //that the highest filter header count is our best filter header? + val filterCountF = getFilterHeaderCount() + val ourBestFilterHeader = for { + count <- filterCountF + filterHeader <- getFilterHeadersAtHeight(count) + } yield { + //TODO: Figure out what the best way to select + //the best filter header is if we have competing + //chains. (Same thing applies to getBestBlockHash() + //for now, just do the dumb thing and pick the first one + filterHeader match { + case tip1 +: _ +: _ => + tip1 + case tip +: _ => + tip + case Vector() => + sys.error(s"No filter headers found in database!") + } + } + + ourBestFilterHeader + } + /** @inheritdoc */ override def getFilterHeader( blockHash: DoubleSha256DigestBE): Future[Option[CompactFilterHeaderDb]] = @@ -410,6 +443,28 @@ case class ChainHandler( .map(dbos => dbos.map(dbo => FilterResponse(dbo.golombFilter, dbo.blockHashBE, dbo.height))) + + /** @inheritdoc */ + override def getHeadersBetween( + from: BlockHeaderDb, + to: BlockHeaderDb): Future[Vector[BlockHeaderDb]] = { + logger.info(s"Finding headers from=$from to=$to") + def loop( + currentF: Future[BlockHeaderDb], + accum: Vector[BlockHeaderDb]): Future[Vector[BlockHeaderDb]] = { + currentF.flatMap { current => + if (current.previousBlockHashBE == from.hashBE) { + Future.successful(current +: accum) + } else { + val nextOptF = getHeader(current.previousBlockHashBE) + val nextF = nextOptF.map(_.getOrElse( + sys.error(s"Could not find header=${current.previousBlockHashBE}"))) + loop(nextF, current +: accum) + } + } + } + loop(Future.successful(to), Vector.empty) + } } object ChainHandler { diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala index 5832d6a82d..640cf99dac 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/ChainSync.scala @@ -1,16 +1,16 @@ package org.bitcoins.chain.blockchain.sync +import org.bitcoins.chain.ChainVerificationLogger import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.models.BlockHeaderDb import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.protocol.blockchain.BlockHeader import scala.concurrent.{ExecutionContext, Future} -import org.bitcoins.chain.config.ChainAppConfig -import org.bitcoins.chain.ChainVerificationLogger -trait ChainSync extends ChainVerificationLogger { +abstract class ChainSync extends ChainVerificationLogger { /** This method checks if our chain handler has the tip of the blockchain as an external source * If we do not have the same chain, we sync our chain handler until we are at the same best block hash @@ -74,12 +74,11 @@ trait ChainSync extends ChainVerificationLogger { require(tips.nonEmpty, s"Cannot sync without the genesis block") //we need to walk backwards on the chain until we get to one of our tips - val tipsBH = tips.map(_.blockHeader) def loop( lastHeaderF: Future[BlockHeader], - accum: List[BlockHeader]): Future[List[BlockHeader]] = { + accum: Vector[BlockHeader]): Future[Vector[BlockHeader]] = { lastHeaderF.flatMap { lastHeader => if (tipsBH.contains(lastHeader)) { //means we have synced back to a block that we know @@ -117,18 +116,17 @@ trait ChainSync extends ChainVerificationLogger { } else { //this represents all headers we have received from our external data source //and need to process with our chain handler - val headersToSyncF = loop(bestHeaderF, List.empty) + val headersToSyncF = loop(bestHeaderF, Vector.empty) //now we are going to add them to our chain and return the chain api headersToSyncF.flatMap { headers => logger.info( s"Attempting to sync ${headers.length} blockheader to our chainstate") - chainApi.processHeaders(headers.toVector) + chainApi.processHeaders(headers) } } } - } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala new file mode 100644 index 0000000000..5e02e5e40f --- /dev/null +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterSync.scala @@ -0,0 +1,224 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.chain.ChainVerificationLogger +import org.bitcoins.chain.api.ChainApi +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models.{BlockHeaderDb, CompactFilterHeaderDb} +import org.bitcoins.core.gcs.{FilterHeader, GolombFilter} +import org.bitcoins.core.p2p.CompactFilterMessage +import org.bitcoins.core.protocol.blockchain.BlockHeader + +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +/** A class that is meant to expose and api to sync + * [[GolombFilter]]s and [[FilterHeader]]s from an external + * data source. The important thing to implement is + * {{{ + * getFilterFunc: BlockHeader => Future[GolombFilter] + * }}} + * which will allow us to sync our internal filters against. + * + * It should be noted you are entirely trusting the provider + * of the `getFilterFunc` as you aren't able to validate the result + * against another peer that as BIP157 specifies + * + * @see [[https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki#client-operation]] + * */ +abstract class FilterSync extends ChainVerificationLogger { + + def syncFilters( + chainApi: ChainApi, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash], + batchSize: Int = 25)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + + val ourBestFilterHeaderF = chainApi.getBestFilterHeader() + val ourBestBlockHeaderF = chainApi.getBestBlockHeader() + for { + ours <- ourBestFilterHeaderF + ourBestBlockHeader <- ourBestBlockHeaderF + syncedChainApi <- syncFiltersToTip(chainApi = chainApi, + ourBestHeader = ourBestBlockHeader, + ourBestFilterHeader = ours, + getFilterFunc = getFilterFunc, + batchSize) + } yield { + syncedChainApi + } + } + + private case class BlockFilterAggregated( + filterHeader: FilterHeader, + filter: GolombFilter, + blockHeader: BlockHeader) + + /** + * Syncs our best filter header to our best block hash + * @param chainApi our current chain state + * @param ourBestHeader the block header we are going to sync filters up until + * @param ourBestFilterHeader the best filter header we have + * @param getFilterFunc given a block hash it retrieves filter associated with that hash from our external source + * @param ec + * @return + */ + private def syncFiltersToTip( + chainApi: ChainApi, + ourBestHeader: BlockHeaderDb, + ourBestFilterHeader: CompactFilterHeaderDb, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash], + batchSize: Int)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + if (ourBestFilterHeader.blockHashBE == ourBestHeader.hashBE) { + logger.info( + s"Our filters are synced with our peers filters, both at blockHash=${ourBestFilterHeader.blockHashBE}") + Future.successful(chainApi) + } else { + logger.info( + s"Beginning sync for filters from filterheader=${ourBestFilterHeader} to blockheader=${ourBestHeader.hashBE}") + //let's fetch all missing filter headers first + val bestFilterBlockHeaderF = + chainApi.getHeader(ourBestFilterHeader.blockHashBE) + + val headersMissingFiltersF = for { + bestFilterBlockHeader <- bestFilterBlockHeaderF + missing <- chainApi.getHeadersBetween(from = bestFilterBlockHeader.get, + to = ourBestHeader) + } yield { + missing + } + + //because filters can be really large, we don't want to process too many + //at once, so batch them in groups and the process them. + val groupedHeadersF: Future[Iterator[Vector[BlockHeaderDb]]] = for { + missing <- headersMissingFiltersF + } yield missing.grouped(batchSize) + + val init = Future.successful(chainApi) + for { + groupedHeaders <- groupedHeadersF + finalChainApi <- { + groupedHeaders.foldLeft(init) { + case (apiF, missingHeaders) => + for { + api <- apiF + bestFilter <- api.getBestFilterHeader() + newApi <- fetchFiltersForHeaderGroup(api, + missingHeaders, + bestFilter, + getFilterFunc) + } yield newApi + } + } + } yield finalChainApi + } + } + + private def fetchFiltersForHeaderGroup( + chainApi: ChainApi, + missingHeaders: Vector[BlockHeaderDb], + ourBestFilterHeader: CompactFilterHeaderDb, + getFilterFunc: BlockHeader => Future[FilterWithHeaderHash])( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + //now that we have headers that are missing filters, let's fetch the filters + + val fetchNested = missingHeaders.map { b => + val filterF = getFilterFunc(b.blockHeader) + filterF.map(f => (b, f)) + } + + val fetchFiltersF: Future[Vector[(BlockHeaderDb, FilterWithHeaderHash)]] = { + Future.sequence(fetchNested) + } + + //now let's build filter headers + val blockFiltersAggF: Future[Vector[BlockFilterAggregated]] = { + fetchFiltersF.map { + case filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)] => + buildBlockFilterAggregated(filters, ourBestFilterHeader) + } + } + + val compactFiltersF = blockFiltersAggF.map { filtersAgg => + filtersAgg.map { agg => + CompactFilterMessage(blockHash = agg.blockHeader.hash, + filter = agg.filter) + } + } + + val blockHeaderOptF = blockFiltersAggF.map { filtersAgg => + filtersAgg.lastOption.map(_.blockHeader) + } + val filterHeadersF = blockFiltersAggF.map(_.map(_.filterHeader)) + + for { + blockHeaderOpt <- blockHeaderOptF + compactFilters <- compactFiltersF + filterHeaders <- filterHeadersF + filtersChainApi <- { + blockHeaderOpt match { + case None => + logger.info( + s"We did not have a block header to process filter headers with! filterHeaders=${filterHeaders} " + + s"compactFilters=${compactFilters} ourBestFilterHeader=${ourBestFilterHeader}") + Future.successful(chainApi) + case Some(blockHeader) => + for { + headersChainApi <- chainApi.processFilterHeaders( + filterHeaders, + blockHeader.hashBE) + filtersChainApi <- headersChainApi.processFilters(compactFilters) + } yield filtersChainApi + } + } + } yield { + filtersChainApi + } + } + + /** This builds a [[BlockFilterAggregated]] data structure + * and verifies that the filter header hash from an external + * data source matches the hash of the header we generated internally. + * If the hash does not match, someone is likely feeding you a bad header chain. + * */ + private def buildBlockFilterAggregated( + filters: Vector[(BlockHeaderDb, FilterWithHeaderHash)], + ourBestFilterHeader: CompactFilterHeaderDb): Vector[ + BlockFilterAggregated] = { + + val accum = new mutable.ArrayBuffer[BlockFilterAggregated](filters.length) + + filters.foreach { + case (blockHeaderDb, filterWithHash) => + val FilterWithHeaderHash(filter, expectedHeaderHash) = filterWithHash + val filterHeader = if (accum.isEmpty) { + //first header to connect with our internal headers + //that have already been validated + FilterHeader(filterHash = filter.hash, + prevHeaderHash = ourBestFilterHeader.hashBE.flip) + } else { + //get previous filter header's hash + val prevHeaderHash = accum.last.filterHeader.hash + FilterHeader(filterHash = filter.hash, + prevHeaderHash = prevHeaderHash) + } + if (filterHeader.hash == expectedHeaderHash.flip) { + val agg = BlockFilterAggregated(filterHeader, + filter, + blockHeaderDb.blockHeader) + accum.append(agg) + } else { + sys.error( + s"The header we created was different from the expected hash we received " + + s"from an external data source! Something is wrong. Our filterHeader=${filterHeader} expectedHash=$expectedHeaderHash") + } + } + + accum.toVector + } +} + +object FilterSync extends FilterSync diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala new file mode 100644 index 0000000000..36bc21b5b4 --- /dev/null +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/sync/FilterWithHeaderHash.scala @@ -0,0 +1,11 @@ +package org.bitcoins.chain.blockchain.sync + +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.core.gcs.GolombFilter + +/** Represents a [[GolombFilter]] with it's [[org.bitcoins.core.gcs.FilterHeader]] associated with it + * This is needed because bitcoin core's 'getblockfilter' rpc returns things in this structure + * */ +case class FilterWithHeaderHash( + filter: GolombFilter, + headerHash: DoubleSha256DigestBE) diff --git a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala index 9461a23c2b..0bc40373b8 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/BlockHeaderDAO.scala @@ -141,7 +141,7 @@ case class BlockHeaderDAO()( } } - /** Gets Block Headers of all childred starting with the given block hash (inclusive), could be out of order */ + /** Gets Block Headers of all children starting with the given block hash (inclusive), could be out of order */ def getNChildren( ancestorHash: DoubleSha256DigestBE, n: Int): Future[Vector[BlockHeaderDb]] = { diff --git a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala index 4305f4f0ad..7995699aa9 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterHeaderTable.scala @@ -14,6 +14,10 @@ case class CompactFilterHeaderDb( def filterHeader: FilterHeader = FilterHeader(filterHashBE.flip, previousFilterHeaderBE.flip) + + override def toString: String = { + s"CompactFilterDb(hashBE=$hashBE,filterHashBE=$filterHashBE,previousFilterHeaderBE=$previousFilterHeaderBE,blockHashBE=$blockHashBE,height=$height)" + } } object CompactFilterHeaderDbHelper { @@ -21,7 +25,7 @@ object CompactFilterHeaderDbHelper { def fromFilterHeader( filterHeader: FilterHeader, blockHash: DoubleSha256DigestBE, - height: Int) = + height: Int): CompactFilterHeaderDb = CompactFilterHeaderDb( hashBE = filterHeader.hash.flip, filterHashBE = filterHeader.filterHash.flip, diff --git a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala index afda8f0afb..2f7844db66 100644 --- a/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala +++ b/chain/src/main/scala/org/bitcoins/chain/models/CompactFilterTable.scala @@ -13,10 +13,17 @@ case class CompactFilterDb( bytes: ByteVector, height: Int, blockHashBE: DoubleSha256DigestBE) { + require( + CryptoUtil.doubleSHA256(bytes).flip == hashBE, + s"Bytes must hash to hashBE! It looks like you didn't construct CompactFilterDb correctly") def golombFilter: GolombFilter = filterType match { case FilterType.Basic => BlockFilter.fromBytes(bytes, blockHashBE.flip) } + + override def toString: String = { + s"CompactFilterDb(hashBE=$hashBE,filterType=$filterType,height=$height,blockHashBE=$blockHashBE,bytes=${bytes})" + } } object CompactFilterDbHelper { diff --git a/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala b/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala index 61248dfa95..ede0c15532 100644 --- a/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala +++ b/core/src/main/scala/org/bitcoins/core/api/ChainQueryApi.scala @@ -32,6 +32,7 @@ trait ChainQueryApi { def getFiltersBetweenHeights( startHeight: Int, endHeight: Int): Future[Vector[FilterResponse]] + } object ChainQueryApi { diff --git a/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala b/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala index 03c7b37bd7..0493aa72dc 100644 --- a/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala +++ b/core/src/main/scala/org/bitcoins/core/gcs/FilterHeader.scala @@ -1,6 +1,6 @@ package org.bitcoins.core.gcs -import org.bitcoins.core.crypto.DoubleSha256Digest +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.core.util.CryptoUtil /** @@ -25,4 +25,17 @@ case class FilterHeader( def nextHeader(nextFilterHash: DoubleSha256Digest): FilterHeader = { FilterHeader(filterHash = nextFilterHash, prevHeaderHash = this.hash) } + + override def toString: String = { + s"FilterHeader(hashBE=${hash.flip},filterHashBE=${filterHash.flip.hex},prevHeaderHashBE=${prevHeaderHash.flip.hex})" + } +} + +object FilterHeader { + + def apply( + filterHash: DoubleSha256DigestBE, + prevHeaderHash: DoubleSha256DigestBE): FilterHeader = { + new FilterHeader(filterHash.flip, prevHeaderHash.flip) + } } diff --git a/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala b/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala index d797b269fc..d1379d2fde 100644 --- a/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala +++ b/core/src/main/scala/org/bitcoins/core/gcs/GolombFilter.scala @@ -1,6 +1,6 @@ package org.bitcoins.core.gcs -import org.bitcoins.core.crypto.DoubleSha256Digest +import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.core.number.{UInt64, UInt8} import org.bitcoins.core.protocol.{CompactSizeUInt, NetworkElement} import org.bitcoins.core.util.CryptoUtil @@ -26,6 +26,10 @@ case class GolombFilter( CryptoUtil.doubleSHA256(this.bytes) } + lazy val hashBE: DoubleSha256DigestBE = { + hash.flip + } + /** Given the previous FilterHeader, constructs the header corresponding to this */ def getHeader(prevHeader: FilterHeader): FilterHeader = { FilterHeader(filterHash = this.hash, prevHeaderHash = prevHeader.hash) diff --git a/docs/applications/chain.md b/docs/applications/chain.md index 24d3cacc65..30f9a25f52 100644 --- a/docs/applications/chain.md +++ b/docs/applications/chain.md @@ -43,14 +43,14 @@ val rpcCli = BitcoindRpcClient(bitcoindInstance) // Next, we need to create a way to monitor the chain: -val getBestBlockHash = ChainTestUtil.bestBlockHashFnRpc(Future.successful(rpcCli)) +val getBestBlockHash = SyncUtil.getBestBlockHashFunc(rpcCli) -val getBlockHeader = ChainTestUtil.getBlockHeaderFnRpc(Future.successful(rpcCli)) +val getBlockHeader = SyncUtil.getBlockHeaderFunc(rpcCli) // set a data directory val datadir = Files.createTempDirectory("bitcoin-s-test") -// set the currenet network to regtest +// set the current network to regtest import com.typesafe.config.ConfigFactory val config = ConfigFactory.parseString { """ @@ -68,9 +68,11 @@ val blockHeaderDAO = BlockHeaderDAO() val compactFilterHeaderDAO = CompactFilterHeaderDAO() val compactFilterDAO = CompactFilterDAO() -// Now, do the actual syncing: + +//initialize the chain handler from the database val chainHandlerF = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO) +// Now, do the actual syncing: val syncedChainApiF = for { _ <- chainProjectInitF handler <- chainHandlerF diff --git a/docs/applications/filter-sync.md b/docs/applications/filter-sync.md new file mode 100644 index 0000000000..561c571c2e --- /dev/null +++ b/docs/applications/filter-sync.md @@ -0,0 +1,133 @@ +--- +title: Syncing Blockfilters +id: filter-sync +--- + +The `chain` module has the ability to store [BIP157](https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki) block filters locally. Generally these filters are useful +for doing wallet rescans. The idea is you can generate a list of script pubkeys you are interested in and see if +the block filter matches the scriptPubKey. + +As we demonstrated in [chain.md](chain.md) with block headers, you can sync block filters from an external data source +as well. We are going to use bitcoind as an example of an external data source to sync filters against. It is important +that the bitcoind version you are using is >= `v19` as the [`getblockfilter`](https://github.com/bitcoin/bitcoin/blob/master/doc/release-notes/release-notes-0.19.0.1.md#new-rpcs) +rpc is implemented there. You need to make sure bitcoind is started with the `-blockfilterindex` flag. This makes it +so we can query filters. + +#### Abstract idea of syncing filters. + +Our internal infrastructure depends on one function to be implemented to be able to sync filters. + +```scala mdoc:invisible +import org.bitcoins.core.protocol.blockchain._ +import org.bitcoins.core.gcs._ +import scala.concurrent.Future + +import akka.actor.ActorSystem + +import org.bitcoins.core.gcs._ +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.chain.blockchain._ +import org.bitcoins.chain.blockchain.sync._ +import org.bitcoins.chain.models._ +import org.bitcoins.chain.config._ + +import org.bitcoins.rpc.client.common._ +import org.bitcoins.testkit.BitcoinSTestAppConfig +import org.bitcoins.testkit.chain._ +import org.bitcoins.testkit.chain.fixture.BitcoindV19ChainHandler + +import scala.concurrent._ + +``` + +```scala mdoc:compile-only +val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = ??? +``` + +With `getFilterFunc` given a `BlockHeader` we can find it's associated `GolombFilter` -- which is our internal repesentation +of a BIP157 block filter. + +The basic idea for `FilterSync.syncFilters()` is to look at our current best block header inside of our `ChainApi.getBestBlockHeader()` +and then check what our best block filter's block hash is with `ChainApi.getBestFilterHeader()`. If the blockfilter returned from our internal +data store is NOT associated with our best block header, we attempt to sync our filter headers to catch up to our best block header. + +### Syncing block filters against bitcoind + +We are going to implement `getFilterFunc` with bitcoind and then sync a few filter headers. + +```scala mdoc:compile-only + +implicit val system = ActorSystem(s"filter-sync-example") +implicit val ec = system.dispatcher +implicit val chainAppConfig = BitcoinSTestAppConfig.getNeutrinoTestConfig().chainConf + +//let's use a helper method to get a v19 bitcoind +//instance and a chainApi +val bitcoindWithChainApiF: Future[BitcoindV19ChainHandler] = { + ChainUnitTest.createBitcoindV19ChainHandler() +} +val bitcoindF = bitcoindWithChainApiF.map(_.bitcoind) +val chainApiF = bitcoindWithChainApiF.map(_.chainHandler) + +val filterType = FilterType.Basic +val addressF = bitcoindF.flatMap(_.getNewAddress) + +//this is the function that we are going to use to sync +//our internal filters against. We use this function to query +//for each block filter associated with a blockheader +val getFilterFunc: BlockHeader => Future[FilterWithHeaderHash] = { blockHeader => + val prevFilterResultF = + bitcoindF.flatMap(_.getBlockFilter(blockHeader.hashBE, filterType)) + prevFilterResultF.map { filterResult => + FilterWithHeaderHash(filterResult.filter, filterResult.header) + } +} + +//ok enough setup, let's generate a block that we need to sync the filter for in bitcoind +val block1F = for { + bitcoind <- bitcoindF + address <- addressF + hashes <- bitcoind.generateToAddress(1,address) +} yield hashes + +//to be able to sync filters, we need to make sure our block headers are synced first +//so let's sync our block headers to our internal chainstate +val chainApiSyncedHeadersF = for { + bitcoind <- bitcoindF + handler <- chainApiF + getBestBlockHash = SyncUtil.getBestBlockHashFunc(bitcoind) + getBlockHeader = SyncUtil.getBlockHeaderFunc(bitcoind) + syncedChainApiHeaders <- ChainSync.sync(handler, getBlockHeader, getBestBlockHash) +} yield syncedChainApiHeaders + +//now that we have synced our 1 block header, we can now sync the 1 block filter +//associated with that header. +val chainApiSyncedFiltersF = for { + syncedHeadersChainApi <- chainApiSyncedHeadersF + syncedFilters <- FilterSync.syncFilters(syncedHeadersChainApi,getFilterFunc) +} yield syncedFilters + +//now we should have synced our one filter, let's make sure we have it +val resultF = for { + chainApi <- chainApiSyncedFiltersF + filterHeaderCount <- chainApi.getFilterHeaderCount() + filterCount <- chainApi.getFilterCount() +} yield { + println(s"filterHeaderCount=$filterHeaderCount filterCount=$filterCount") +} + +//cleanup +resultF.onComplete { _ => + for { + c <- bitcoindWithChainApiF + _ <- ChainUnitTest.destroyBitcoindV19ChainApi(c) + _ <- system.terminate() + } yield () +} +``` + +Yay! Now we have synced block filters from an external data source. If you want to repeatedly sync you can just call + +`FilterSync.syncFilters(syncedFiltersChainApi,getFilterFunc)` every time you would like to sync. Again, you need to ensure +your headers are synced before you can sync filters, so make sure that you are calling `ChainSync.sync()` before syncing +filters. \ No newline at end of file diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala index 0db1eb4217..d5d0da8d5c 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainTestUtil.scala @@ -1,17 +1,13 @@ package org.bitcoins.testkit.chain import org.bitcoins.chain.models._ -import org.bitcoins.core.crypto -import org.bitcoins.core.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} +import org.bitcoins.core.crypto.DoubleSha256Digest import org.bitcoins.core.gcs.{BlockFilter, FilterHeader, GolombFilter} import org.bitcoins.core.protocol.blockchain.{ BlockHeader, MainNetChainParams, RegTestNetChainParams } -import org.bitcoins.rpc.client.common.BitcoindRpcClient - -import scala.concurrent.{ExecutionContext, Future} sealed abstract class ChainTestUtil { lazy val regTestChainParams: RegTestNetChainParams.type = @@ -80,20 +76,6 @@ sealed abstract class ChainTestUtil { lazy val blockHeaderDb566496 = BlockHeaderDbHelper.fromBlockHeader(566496, blockHeader566496) } - - /** Creates a best block header function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */ - def bestBlockHashFnRpc(bitcoindF: Future[BitcoindRpcClient])( - implicit ec: ExecutionContext): () => Future[DoubleSha256DigestBE] = { - () => - bitcoindF.flatMap(_.getBestBlockHash) - } - - /** Creates a getBlocKHeader function for [[org.bitcoins.chain.blockchain.sync.ChainSync.sync() ChainSync.sync]] */ - def getBlockHeaderFnRpc(bitcoindF: Future[BitcoindRpcClient])( - implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[ - BlockHeader] = { hash: crypto.DoubleSha256DigestBE => - bitcoindF.flatMap(_.getBlockHeader(hash).map(_.blockHeader)) - } } object ChainTestUtil extends ChainTestUtil diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala index 812a2b9773..e2c64b00c9 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/ChainUnitTest.scala @@ -5,13 +5,17 @@ import java.net.InetSocketAddress import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import org.bitcoins.chain.ChainVerificationLogger +import org.bitcoins.chain.api.ChainApi import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.chain.blockchain.sync.ChainSync import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.db.ChainDbManagement import org.bitcoins.chain.models._ +import org.bitcoins.core.crypto.DoubleSha256DigestBE import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} import org.bitcoins.db.AppConfig -import org.bitcoins.rpc.client.common.BitcoindRpcClient +import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient import org.bitcoins.testkit.chain.fixture._ import org.bitcoins.testkit.fixtures.BitcoinSFixture import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil @@ -211,19 +215,6 @@ trait ChainUnitTest } yield (chainHandler, zmqSubscriber) } - def createChainApiWithBitcoindRpc( - bitcoind: BitcoindRpcClient): Future[BitcoindChainHandlerViaRpc] = { - val handlerWithGenesisHeaderF = - ChainUnitTest.setupHeaderTableWithGenesisHeader() - - val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) - - chainHandlerF.map { handler => - chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler) - } - - } - def createBitcoindChainHandlerViaZmq(): Future[BitcoindChainHandlerViaZmq] = { composeBuildersAndWrap(() => BitcoinSFixture.createBitcoind(), createChainHandlerWithBitcoindZmq, @@ -238,19 +229,11 @@ trait ChainUnitTest bitcoindChainHandler.bitcoindRpc, bitcoindChainHandler.chainHandler) - destroyBitcoindChainApiViaRpc(rpc).map { _ => + ChainUnitTest.destroyBitcoindChainApiViaRpc(rpc).map { _ => bitcoindChainHandler.zmqSubscriber.stop } } - def destroyBitcoindChainApiViaRpc( - bitcoindChainHandler: BitcoindChainHandlerViaRpc): Future[Unit] = { - val stopBitcoindF = - BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc) - val dropTableF = ChainUnitTest.destroyAllTables() - stopBitcoindF.flatMap(_ => dropTableF) - } - /** * Creates a [[org.bitcoins.rpc.client.common.BitcoindRpcClient BitcoindRpcClient]] that is linked to our [[org.bitcoins.chain.blockchain.ChainHandler ChainHandler]] * via a [[org.bitcoins.zmq.ZMQSubscriber zmq]]. This means messages are passed between bitcoin and our chain handler @@ -273,10 +256,22 @@ trait ChainUnitTest def withBitcoindChainHandlerViaRpc(test: OneArgAsyncTest)( implicit system: ActorSystem): FutureOutcome = { val builder: () => Future[BitcoindChainHandlerViaRpc] = { () => - BitcoinSFixture.createBitcoind().flatMap(createChainApiWithBitcoindRpc) + BitcoinSFixture + .createBitcoind() + .flatMap(ChainUnitTest.createChainApiWithBitcoindRpc) } - makeDependentFixture(builder, destroyBitcoindChainApiViaRpc)(test) + makeDependentFixture(builder, ChainUnitTest.destroyBitcoindChainApiViaRpc)( + test) + } + + def withBitcoindV19ChainHandlerViaRpc(test: OneArgAsyncTest)( + implicit system: ActorSystem): FutureOutcome = { + val builder: () => Future[BitcoindV19ChainHandler] = { () => + ChainUnitTest.createBitcoindV19ChainHandler() + } + makeDependentFixture(builder, ChainUnitTest.destroyBitcoindV19ChainApi)( + test) } } @@ -406,6 +401,54 @@ object ChainUnitTest extends ChainVerificationLogger { } } + def createChainApiWithBitcoindRpc(bitcoind: BitcoindRpcClient)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[BitcoindChainHandlerViaRpc] = { + val handlerWithGenesisHeaderF = + ChainUnitTest.setupHeaderTableWithGenesisHeader() + + val chainHandlerF = handlerWithGenesisHeaderF.map(_._1) + + chainHandlerF.map { handler => + chain.fixture.BitcoindChainHandlerViaRpc(bitcoind, handler) + } + } + + def destroyBitcoindChainApiViaRpc( + bitcoindChainHandler: BitcoindChainHandlerViaRpc)( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[Unit] = { + import system.dispatcher + val stopBitcoindF = + BitcoindRpcTestUtil.stopServer(bitcoindChainHandler.bitcoindRpc) + val dropTableF = ChainUnitTest.destroyAllTables() + stopBitcoindF.flatMap(_ => dropTableF) + } + + def createBitcoindV19ChainHandler()( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[BitcoindV19ChainHandler] = { + import system.dispatcher + val bitcoindV = BitcoindVersion.V19 + BitcoinSFixture + .createBitcoind(Some(bitcoindV)) + .flatMap(createChainApiWithBitcoindRpc) + .map { b: BitcoindChainHandlerViaRpc => + BitcoindV19ChainHandler( + b.bitcoindRpc.asInstanceOf[BitcoindV19RpcClient], + b.chainHandler) + } + } + + def destroyBitcoindV19ChainApi( + bitcoindV19ChainHandler: BitcoindV19ChainHandler)( + implicit system: ActorSystem, + chainAppConfig: ChainAppConfig): Future[Unit] = { + val b = BitcoindChainHandlerViaRpc(bitcoindV19ChainHandler.bitcoind, + bitcoindV19ChainHandler.chainHandler) + destroyBitcoindChainApiViaRpc(b) + } + def destroyBitcoind(bitcoind: BitcoindRpcClient)( implicit system: ActorSystem): Future[Unit] = { BitcoindRpcTestUtil.stopServer(bitcoind) @@ -465,4 +508,20 @@ object ChainUnitTest extends ChainVerificationLogger { } + /** Syncs the given chain handler to the given bitcoind */ + def syncFromBitcoind(bitcoind: BitcoindRpcClient, chainHandler: ChainHandler)( + implicit ec: ExecutionContext, + chainAppConfig: ChainAppConfig): Future[ChainApi] = { + //sync headers + //first we need to implement the 'getBestBlockHashFunc' and 'getBlockHeaderFunc' functions + val getBestBlockHashFunc = { () => + bitcoind.getBestBlockHash + } + + val getBlockHeaderFunc = { hash: DoubleSha256DigestBE => + bitcoind.getBlockHeader(hash).map(_.blockHeader) + } + + ChainSync.sync(chainHandler, getBlockHeaderFunc, getBestBlockHashFunc) + } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala new file mode 100644 index 0000000000..86397e7e59 --- /dev/null +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/SyncUtil.scala @@ -0,0 +1,43 @@ +package org.bitcoins.testkit.chain + +import org.bitcoins.chain.blockchain.sync.FilterWithHeaderHash +import org.bitcoins.core.crypto.DoubleSha256DigestBE +import org.bitcoins.core.gcs.{FilterType, GolombFilter} +import org.bitcoins.core.protocol.blockchain.BlockHeader +import org.bitcoins.rpc.client.common.BitcoindRpcClient +import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient +import org.bitcoins.rpc.jsonmodels.GetBlockFilterResult + +import scala.concurrent.{ExecutionContext, Future} + +/** Useful utilities to use in the chain project for syncing things against bitcoind */ +abstract class SyncUtil { + + /** Creates a function that will retrun bitcoin's best block hash when called */ + def getBestBlockHashFunc( + bitcoind: BitcoindRpcClient): () => Future[DoubleSha256DigestBE] = { () => + bitcoind.getBestBlockHash + } + + /** Creates a function that you can pass a hash to and it returns the block header */ + def getBlockHeaderFunc(bitcoind: BitcoindRpcClient)( + implicit ec: ExecutionContext): DoubleSha256DigestBE => Future[ + BlockHeader] = { hash: DoubleSha256DigestBE => + bitcoind.getBlockHeader(hash).map(_.blockHeader) + } + + /** Creates a function that you can pass a block header to and it's return's it's [[GolombFilter]] */ + def getFilterFunc(bitcoind: BitcoindV19RpcClient, filterType: FilterType)( + implicit ec: ExecutionContext): BlockHeader => Future[ + FilterWithHeaderHash] = { + case header: BlockHeader => + val prevFilterResultF = + bitcoind.getBlockFilter(header.hashBE, filterType) + prevFilterResultF.map { + case GetBlockFilterResult(filter, header) => + FilterWithHeaderHash(filter, header) + } + } +} + +object SyncUtil extends SyncUtil diff --git a/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala b/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala index 3c69cdb5db..fc45d7212b 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/chain/fixture/BitcoindChainHandlerViaRpc.scala @@ -3,7 +3,7 @@ package org.bitcoins.testkit.chain.fixture import org.bitcoins.chain.blockchain.ChainHandler import org.bitcoins.rpc.client.common.BitcoindRpcClient -/** Represents a bitcoind instance paired with a chain handler via zmq */ +/** Represents a bitcoind instance paired with a chain handler via rpc */ case class BitcoindChainHandlerViaRpc( bitcoindRpc: BitcoindRpcClient, chainHandler: ChainHandler) diff --git a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala index 3e16291e0c..1c7be9df9a 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/fixtures/BitcoinSFixture.scala @@ -150,10 +150,11 @@ object BitcoinSFixture { /** Creates a new bitcoind instance */ def createBitcoind(versionOpt: Option[BitcoindVersion] = None)( implicit system: ActorSystem): Future[BitcoindRpcClient] = { - import system.dispatcher val instance = BitcoindRpcTestUtil.instance(versionOpt = versionOpt) - val bitcoind = BitcoindRpcClient.withActorSystem(instance) - - bitcoind.start().map(_ => bitcoind) + val bitcoind = versionOpt match { + case Some(v) => BitcoindRpcClient.fromVersion(v, instance) + case None => new BitcoindRpcClient(instance) + } + bitcoind.start() } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala index 3a324b4e78..48402ab21f 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/RescanHandling.scala @@ -49,7 +49,9 @@ private[wallet] trait RescanHandling extends WalletLogger { addressBatchSize: Int): Future[Unit] = { for { scriptPubKeys <- generateScriptPubKeys(addressBatchSize) - blocks <- matchBlocks(scriptPubKeys, endOpt, startOpt) + blocks <- matchBlocks(scriptPubKeys = scriptPubKeys, + endOpt = endOpt, + startOpt = startOpt) _ <- downloadAndProcessBlocks(blocks) externalGap <- calcAddressGap(HDChainType.External) changeGap <- calcAddressGap(HDChainType.Change) @@ -119,8 +121,10 @@ private[wallet] trait RescanHandling extends WalletLogger { Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors() * 2) val blocksF = for { - blocks <- getMatchingBlocks(scriptPubKeys, startOpt, endOpt)( - ExecutionContext.fromExecutor(threadPool)) + blocks <- getMatchingBlocks( + scripts = scriptPubKeys, + startOpt = startOpt, + endOpt = endOpt)(ExecutionContext.fromExecutor(threadPool)) } yield { blocks.sortBy(_.blockHeight).map(_.blockHash.flip) } diff --git a/website/sidebars.json b/website/sidebars.json index dfb4c59b6a..d9f8eb0abb 100644 --- a/website/sidebars.json +++ b/website/sidebars.json @@ -22,6 +22,7 @@ ], "Applications": [ "applications/chain", + "applications/filter-sync", "applications/cli", "applications/configuration", "applications/dlc",