mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-13 19:37:30 +01:00
2020 11 13 issue 2258 (#2260)
* Introduce ChainHandlerCached which behaves like the old ChainHandler. Now Chainhandler.getBestBlockHeader() will read headers from the database * Remove ChainHandler.blockchains field, now it's only available in ChainHandlerCached * De-futurify ChainHandler.fromDatabase() * Adjust logging * Patch test case * Use BlockHeaderDAO.chainTips when getting best header rather thean BlockHeaderDAO.getBlockchains(). Implement a helper method ChainHandler.toChainHandlerCached() * Fix chain.md,wallet.md * Make ChainHandler.getBestBlockHeader() consider time of header if chainwork is the same. Make test cases less strict on what header is the best header when both chainwork and time are the same on the eader * Only execute callbacks on headers that are going to be created in the database, not all headers passed into ChainHandler.processHeadersWithChains() * Turn up log level again * Small optimizations, check if we have seen a header before before processing it in ChainHandler.processHeadersWithChains(). Fix FilterSyncMarker.toString(). Use ChainHandlerCached in Node * Remove ChainHandlerCached in appServer, re-add it in Node.scala
This commit is contained in:
parent
18dfbed8c9
commit
2de17bb4e4
23 changed files with 495 additions and 215 deletions
|
@ -246,11 +246,11 @@ class BitcoinSServerMain(override val args: Array[String])
|
|||
system: ActorSystem): Future[ChainApi] = {
|
||||
val blockEC =
|
||||
system.dispatchers.lookup(Dispatchers.DefaultBlockingDispatcherId)
|
||||
val chainApi = ChainHandler.fromDatabase(
|
||||
blockHeaderDAO = BlockHeaderDAO()(blockEC, chainAppConfig),
|
||||
CompactFilterHeaderDAO()(blockEC, chainAppConfig),
|
||||
CompactFilterDAO()(blockEC, chainAppConfig))
|
||||
for {
|
||||
chainApi <- ChainHandler.fromDatabase(
|
||||
blockHeaderDAO = BlockHeaderDAO()(blockEC, chainAppConfig),
|
||||
CompactFilterHeaderDAO()(blockEC, chainAppConfig),
|
||||
CompactFilterDAO()(blockEC, chainAppConfig))
|
||||
isMissingChainWork <- chainApi.isMissingChainWork
|
||||
chainApiWithWork <-
|
||||
if (isMissingChainWork || force) {
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.testkit.chain.{ChainDbUnitTest, ChainUnitTest}
|
||||
import org.bitcoins.testkit.chain.fixture.ChainFixtureTag
|
||||
import org.scalatest.FutureOutcome
|
||||
|
||||
class ChainHandlerCachedTest extends ChainDbUnitTest {
|
||||
override type FixtureParam = ChainHandlerCached
|
||||
|
||||
override val defaultTag: ChainFixtureTag =
|
||||
ChainFixtureTag.GenesisChainHandlerCachedWithFilter
|
||||
|
||||
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
|
||||
withChainHandlerCachedGenesisFilter(test)
|
||||
|
||||
behavior of "ChainHandlerCached"
|
||||
|
||||
it must "throw an error when we have no chains" in {
|
||||
chainHandlerCached: ChainHandlerCached =>
|
||||
val handler = chainHandlerCached.copy(blockchains = Vector.empty)
|
||||
|
||||
recoverToSucceededIf[RuntimeException] {
|
||||
handler
|
||||
.getBestBlockHeader()
|
||||
}
|
||||
}
|
||||
|
||||
it must "get best filter header with zero blockchains in memory" in {
|
||||
chainHandlerCached: ChainHandlerCached =>
|
||||
val noChainsChainHandler =
|
||||
chainHandlerCached.copy(blockchains = Vector.empty)
|
||||
|
||||
for {
|
||||
filterHeaderOpt <- noChainsChainHandler.getBestFilterHeader()
|
||||
} yield {
|
||||
assert(filterHeaderOpt.isDefined)
|
||||
assert(filterHeaderOpt.get == ChainUnitTest.genesisFilterHeaderDb)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,7 @@ import org.bitcoins.testkit.chain.{
|
|||
ChainUnitTest
|
||||
}
|
||||
import org.bitcoins.testkit.util.FileUtil
|
||||
import org.scalatest.FutureOutcome
|
||||
import org.scalatest.{Assertion, FutureOutcome}
|
||||
import play.api.libs.json.Json
|
||||
|
||||
import scala.concurrent.{Future, Promise}
|
||||
|
@ -52,25 +52,6 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
nonce = UInt32(2083236893)
|
||||
)
|
||||
|
||||
it must "throw an error when we have no chains" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val handler = chainHandler.copy(blockchains = Vector.empty)
|
||||
|
||||
recoverToSucceededIf[RuntimeException] {
|
||||
handler.getBestBlockHeader()
|
||||
}
|
||||
}
|
||||
|
||||
it must "throw an error when we have no headers" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val handler =
|
||||
chainHandler.copy(blockchains = Vector(Blockchain(Vector.empty)))
|
||||
|
||||
recoverToSucceededIf[RuntimeException] {
|
||||
handler.getBestBlockHeader()
|
||||
}
|
||||
}
|
||||
|
||||
it must "process a new valid block header, and then be able to fetch that header" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val newValidHeader =
|
||||
|
@ -127,10 +108,13 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
// check that header B is the leader
|
||||
val assertBBestHashF = for {
|
||||
chainHandler <- chainHandlerCF
|
||||
headerB <- newHeaderBF
|
||||
newHeaderB <- newHeaderBF
|
||||
bestHash <- chainHandler.getBestBlockHash()
|
||||
newHeaderC <- newHeaderCF
|
||||
} yield {
|
||||
assert(bestHash == headerB.hashBE)
|
||||
checkReorgHeaders(header1 = newHeaderB,
|
||||
header2 = newHeaderC,
|
||||
bestHash = bestHash)
|
||||
}
|
||||
|
||||
// build a new header D off of C which was seen later
|
||||
|
@ -378,13 +362,16 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
val assert1F = for {
|
||||
chainHandler <- chainHandlerF
|
||||
newHeaderB <- newHeaderBF
|
||||
newHeaderC <- newHeaderCF
|
||||
blockHeaderBatchOpt <- chainHandler.nextBlockHeaderBatchRange(
|
||||
prevStopHash = ChainTestUtil.regTestGenesisHeaderDb.hashBE,
|
||||
batchSize = batchSize)
|
||||
} yield {
|
||||
assert(blockHeaderBatchOpt.isDefined)
|
||||
val marker = blockHeaderBatchOpt.get
|
||||
assert(newHeaderB.hash == marker.stopBlockHash)
|
||||
checkReorgHeaders(header1 = newHeaderB,
|
||||
header2 = newHeaderC,
|
||||
bestHash = marker.stopBlockHash.flip)
|
||||
assert(newHeaderB.height == marker.startHeight)
|
||||
}
|
||||
|
||||
|
@ -564,18 +551,6 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
}
|
||||
}
|
||||
|
||||
it must "get best filter header with zero blockchains in memory" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val noChainsChainHandler = chainHandler.copy(blockchains = Vector.empty)
|
||||
|
||||
for {
|
||||
filterHeaderOpt <- noChainsChainHandler.getBestFilterHeader()
|
||||
} yield {
|
||||
assert(filterHeaderOpt.isDefined)
|
||||
assert(filterHeaderOpt.get == ChainUnitTest.genesisFilterHeaderDb)
|
||||
}
|
||||
}
|
||||
|
||||
it must "fail when processing duplicate filters" in {
|
||||
chainHandler: ChainHandler =>
|
||||
recoverToSucceededIf[DuplicateFilters] {
|
||||
|
@ -634,4 +609,21 @@ class ChainHandlerTest extends ChainDbUnitTest {
|
|||
result <- resultP.future
|
||||
} yield assert(result)
|
||||
}
|
||||
|
||||
/** Checks that
|
||||
* 1. The header1 & header2 have the same chainwork
|
||||
* 2. Checks that header1 and header2 have the same time
|
||||
* 3. Checks bestHash is one of header1.hashBE or header2.hashBE
|
||||
*/
|
||||
private def checkReorgHeaders(
|
||||
header1: BlockHeaderDb,
|
||||
header2: BlockHeaderDb,
|
||||
bestHash: DoubleSha256DigestBE): Assertion = {
|
||||
assert(header1.chainWork == header2.chainWork)
|
||||
assert(header1.time == header2.time)
|
||||
//if both chainwork and time are the same, we are left to
|
||||
//how the database serves up the data
|
||||
//just make sure it is one of the two headers
|
||||
assert(Vector(header1.hashBE, header2.hashBE).contains(bestHash))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.chain
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.pow.Pow
|
||||
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, BlockHeaderDbHelper}
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.crypto.DoubleSha256DigestBE
|
||||
import org.bitcoins.testkit.chain.{
|
||||
ChainDbUnitTest,
|
||||
ChainTestUtil,
|
||||
|
@ -19,7 +21,7 @@ import scala.io.BufferedSource
|
|||
|
||||
class MainnetChainHandlerTest extends ChainDbUnitTest {
|
||||
|
||||
override type FixtureParam = ChainHandler
|
||||
override type FixtureParam = ChainHandlerCached
|
||||
|
||||
override val defaultTag: ChainFixtureTag = ChainFixtureTag.GenisisChainHandler
|
||||
|
||||
|
@ -37,12 +39,12 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
val genesis: BlockHeaderDb = ChainUnitTest.genesisHeaderDb
|
||||
|
||||
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
|
||||
withChainHandler(test)
|
||||
withChainHandlerCached(test)
|
||||
|
||||
behavior of "MainnetChainHandler"
|
||||
|
||||
it must "benchmark ChainHandler.processHeaders()" in {
|
||||
chainHandler: ChainHandler =>
|
||||
chainHandler: ChainHandlerCached =>
|
||||
val blockHeaders =
|
||||
headersResult.drop(
|
||||
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)
|
||||
|
@ -98,7 +100,7 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
}
|
||||
|
||||
it must "have getBestBlockHash return the header with the most work, not the highest" in {
|
||||
tempHandler: ChainHandler =>
|
||||
tempHandler: ChainHandlerCached =>
|
||||
val dummyHeader =
|
||||
BlockHeaderDbHelper.fromBlockHeader(1,
|
||||
BigInt(0),
|
||||
|
@ -129,7 +131,7 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
}
|
||||
|
||||
it must "be able to process and fetch real headers from mainnet" in {
|
||||
chainHandler: ChainHandler =>
|
||||
chainHandler: ChainHandlerCached =>
|
||||
val blockHeaders =
|
||||
headersResult.drop(
|
||||
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)
|
||||
|
@ -161,10 +163,11 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
|
||||
createdF.flatMap { _ =>
|
||||
val blockchain = Blockchain.fromHeaders(firstThreeBlocks.reverse)
|
||||
val handler = ChainHandler(chainHandler.blockHeaderDAO,
|
||||
chainHandler.filterHeaderDAO,
|
||||
chainHandler.filterDAO,
|
||||
blockchain)
|
||||
val handler = ChainHandlerCached(chainHandler.blockHeaderDAO,
|
||||
chainHandler.filterHeaderDAO,
|
||||
chainHandler.filterDAO,
|
||||
Vector(blockchain),
|
||||
Map.empty)
|
||||
val processorF = Future.successful(handler)
|
||||
// Takes way too long to do all blocks
|
||||
val blockHeadersToTest = blockHeaders.tail
|
||||
|
@ -177,44 +180,46 @@ class MainnetChainHandlerTest extends ChainDbUnitTest {
|
|||
}
|
||||
}
|
||||
|
||||
it must "properly recalculate chain work" in { tempHandler: ChainHandler =>
|
||||
val headersWithNoWork = Vector(
|
||||
BlockHeaderDbHelper.fromBlockHeader(3,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562464),
|
||||
BlockHeaderDbHelper.fromBlockHeader(2,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562463),
|
||||
BlockHeaderDbHelper.fromBlockHeader(1,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562462)
|
||||
)
|
||||
it must "properly recalculate chain work" in {
|
||||
tempHandler: ChainHandlerCached =>
|
||||
val headersWithNoWork = Vector(
|
||||
BlockHeaderDbHelper.fromBlockHeader(3,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562464),
|
||||
BlockHeaderDbHelper.fromBlockHeader(2,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562463),
|
||||
BlockHeaderDbHelper.fromBlockHeader(1,
|
||||
BigInt(0),
|
||||
ChainTestUtil.blockHeader562462)
|
||||
)
|
||||
|
||||
val noWorkGenesis = genesis.copy(chainWork = BigInt(0))
|
||||
val noWorkGenesis = genesis.copy(chainWork = BigInt(0))
|
||||
|
||||
val blockchain =
|
||||
Blockchain(headersWithNoWork :+ noWorkGenesis)
|
||||
val blockchain =
|
||||
Blockchain(headersWithNoWork :+ noWorkGenesis)
|
||||
|
||||
val chainHandler = tempHandler.copy(blockchains = Vector(blockchain))
|
||||
val chainHandler = tempHandler.copy(blockchains = Vector(blockchain))
|
||||
|
||||
for {
|
||||
_ <- chainHandler.blockHeaderDAO.update(noWorkGenesis)
|
||||
_ <- chainHandler.blockHeaderDAO.createAll(headersWithNoWork)
|
||||
lowestNoWork <- chainHandler.blockHeaderDAO.getLowestNoWorkHeight
|
||||
_ = assert(lowestNoWork == 0)
|
||||
isMissingWork <- chainHandler.isMissingChainWork
|
||||
_ = assert(isMissingWork)
|
||||
newHandler <- chainHandler.recalculateChainWork
|
||||
headerDb <- newHandler.getBestBlockHeader()
|
||||
} yield {
|
||||
assert(headerDb.height == headersWithNoWork.head.height)
|
||||
assert(
|
||||
newHandler.blockchains.head
|
||||
.groupBy(_.hashBE)
|
||||
.forall(_._2.size == 1))
|
||||
assert(headerDb.hashBE == headersWithNoWork.head.hashBE)
|
||||
assert(headerDb.chainWork == BigInt(12885098501L))
|
||||
}
|
||||
for {
|
||||
_ <- chainHandler.blockHeaderDAO.update(noWorkGenesis)
|
||||
_ <- chainHandler.blockHeaderDAO.createAll(headersWithNoWork)
|
||||
lowestNoWork <- chainHandler.blockHeaderDAO.getLowestNoWorkHeight
|
||||
_ = assert(lowestNoWork == 0)
|
||||
isMissingWork <- chainHandler.isMissingChainWork
|
||||
_ = assert(isMissingWork)
|
||||
newHandler <- chainHandler.recalculateChainWork
|
||||
blockchains <- chainHandler.blockHeaderDAO.getBlockchains()
|
||||
headerDb <- newHandler.getBestBlockHeader()
|
||||
} yield {
|
||||
assert(headerDb.height == headersWithNoWork.head.height)
|
||||
val grouped = blockchains.head.groupBy(_.hashBE)
|
||||
assert(
|
||||
grouped
|
||||
.forall(_._2.size == 1))
|
||||
assert(headerDb.hashBE == headersWithNoWork.head.hashBE)
|
||||
assert(headerDb.chainWork == BigInt(12885098501L))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ private[blockchain] trait BaseBlockChainCompObject
|
|||
findPrevBlockHeaderIdx(header, blockchain) match {
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"No common ancestor found in the chain to connect to ${header.hashBE}")
|
||||
s"No common ancestor found in the chain with tip=${blockchain.tip.hashBE.hex} to connect to hash=${header.hashBE} prevHash=${header.previousBlockHashBE}")
|
||||
val err = TipUpdateResult.BadPreviousBlockHash(header)
|
||||
val failed = ConnectTipResult.BadTip(err)
|
||||
failed
|
||||
|
|
|
@ -23,20 +23,22 @@ import scala.concurrent._
|
|||
* of [[ChainApi ChainApi]], this is the entry point in to the
|
||||
* chain project.
|
||||
*
|
||||
* This implementation of [[ChainApi]] reads all values directly from the database. If you want an optimized version
|
||||
* that caches headers locally please see [[ChainHandlerCached]]
|
||||
*
|
||||
* @param blockHeaderDAO block header DB
|
||||
* @param filterHeaderDAO filter header DB
|
||||
* @param filterDAO filter DB
|
||||
* @param blockchains current blockchains
|
||||
* @param blockFilterCheckpoints compact filter checkpoints for filter header verification in form of a map (block header hash -> filter header hash)
|
||||
* @param chainConfig config file
|
||||
*/
|
||||
case class ChainHandler(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO,
|
||||
blockchains: Vector[Blockchain],
|
||||
blockFilterCheckpoints: Map[DoubleSha256DigestBE, DoubleSha256DigestBE])(
|
||||
implicit
|
||||
class ChainHandler(
|
||||
val blockHeaderDAO: BlockHeaderDAO,
|
||||
val filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
val filterDAO: CompactFilterDAO,
|
||||
val blockFilterCheckpoints: Map[
|
||||
DoubleSha256DigestBE,
|
||||
DoubleSha256DigestBE])(implicit
|
||||
val chainConfig: ChainAppConfig,
|
||||
executionContext: ExecutionContext)
|
||||
extends ChainApi
|
||||
|
@ -51,28 +53,44 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
override def getBestBlockHeader(): Future[BlockHeaderDb] = {
|
||||
Future {
|
||||
logger.debug(s"Querying for best block hash")
|
||||
//https://bitcoin.org/en/glossary/block-chain
|
||||
val groupedChains = blockchains.groupBy(_.tip.chainWork)
|
||||
val maxWork = groupedChains.keys.max
|
||||
val chains = groupedChains(maxWork)
|
||||
/** Given a set of blockchains, determines which one has the best header */
|
||||
protected def getBestBlockHeaderHelper(
|
||||
chains: Vector[Blockchain]): BlockHeaderDb = {
|
||||
logger.debug(
|
||||
s"Finding best block hash out of chains.length=${chains.length}")
|
||||
//https://bitcoin.org/en/glossary/block-chain
|
||||
val groupedChains = chains.groupBy(_.tip.chainWork)
|
||||
val maxWork = groupedChains.keys.max
|
||||
val chainsByWork = groupedChains(maxWork)
|
||||
|
||||
val bestHeader: BlockHeaderDb = chains match {
|
||||
case Vector() =>
|
||||
// This should never happen
|
||||
val errMsg = s"Did not find blockchain with work $maxWork"
|
||||
logger.error(errMsg)
|
||||
throw new RuntimeException(errMsg)
|
||||
case chain +: Vector() =>
|
||||
chain.tip
|
||||
case chain +: rest =>
|
||||
logger.warn(
|
||||
s"We have multiple competing blockchains: ${(chain +: rest).map(_.tip.hashBE.hex).mkString(", ")}")
|
||||
chain.tip
|
||||
}
|
||||
bestHeader
|
||||
val bestHeader: BlockHeaderDb = chainsByWork match {
|
||||
case Vector() =>
|
||||
// This should never happen
|
||||
val errMsg = s"Did not find blockchain with work $maxWork"
|
||||
logger.error(errMsg)
|
||||
throw new RuntimeException(errMsg)
|
||||
case chain +: Vector() =>
|
||||
chain.tip
|
||||
case chain +: rest =>
|
||||
logger.warn(
|
||||
s"We have multiple competing blockchains with same work, selecting by time: ${(chain +: rest)
|
||||
.map(_.tip.hashBE.hex)
|
||||
.mkString(", ")}")
|
||||
//since we have same chainwork, just take the oldest tip
|
||||
//as that's "more likely" to have been propagated first
|
||||
//and had more miners building on top of it
|
||||
chainsByWork.sortBy(_.tip.time).head.tip
|
||||
}
|
||||
bestHeader
|
||||
}
|
||||
|
||||
override def getBestBlockHeader(): Future[BlockHeaderDb] = {
|
||||
val tipsF: Future[Vector[BlockHeaderDb]] = blockHeaderDAO.chainTips
|
||||
for {
|
||||
tips <- tipsF
|
||||
chains = tips.map(t => Blockchain.fromHeaders(Vector(t)))
|
||||
} yield {
|
||||
getBestBlockHeaderHelper(chains)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,41 +108,53 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def processHeaders(
|
||||
headers: Vector[BlockHeader]): Future[ChainApi] = {
|
||||
protected def processHeadersWithChains(
|
||||
headers: Vector[BlockHeader],
|
||||
blockchains: Vector[Blockchain]): Future[ChainApi] = {
|
||||
if (headers.isEmpty) {
|
||||
Future.successful(this)
|
||||
} else {
|
||||
val headersWeAlreadyHave = blockchains.flatMap(_.headers)
|
||||
|
||||
//if we already have the header don't process it again
|
||||
val filteredHeaders = headers.filterNot(h =>
|
||||
headersWeAlreadyHave.exists(_.hashBE == h.hashBE))
|
||||
|
||||
val blockchainUpdates: Vector[BlockchainUpdate] = {
|
||||
Blockchain.connectHeadersToChains(headers, blockchains)
|
||||
Blockchain.connectHeadersToChains(headers = filteredHeaders,
|
||||
blockchains = blockchains)
|
||||
}
|
||||
|
||||
val successfullyValidatedHeaders = blockchainUpdates
|
||||
.flatMap(_.successfulHeaders)
|
||||
|
||||
val headersToBeCreated = {
|
||||
// During reorgs, we can be sent a header twice
|
||||
blockchainUpdates
|
||||
.flatMap(_.successfulHeaders)
|
||||
.distinct
|
||||
.filterNot(blockchains.flatMap(_.headers).contains)
|
||||
successfullyValidatedHeaders.distinct
|
||||
}
|
||||
|
||||
val chains = blockchainUpdates.map(_.blockchain)
|
||||
|
||||
val createdF = blockHeaderDAO.createAll(headersToBeCreated)
|
||||
|
||||
val newChainHandler = this.copy(blockchains = chains)
|
||||
val newChainHandler = ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
blockFilterCheckpoints =
|
||||
blockFilterCheckpoints)
|
||||
|
||||
createdF.map { headers =>
|
||||
if (chainConfig.chainCallbacks.onBlockHeaderConnected.nonEmpty) {
|
||||
headers.reverseIterator.foldLeft(FutureUtil.unit) { (acc, header) =>
|
||||
for {
|
||||
_ <- acc
|
||||
_ <-
|
||||
chainConfig.chainCallbacks
|
||||
.executeOnBlockHeaderConnectedCallbacks(logger,
|
||||
header.height,
|
||||
header.blockHeader)
|
||||
} yield ()
|
||||
headersToBeCreated.reverseIterator.foldLeft(FutureUtil.unit) {
|
||||
(acc, header) =>
|
||||
for {
|
||||
_ <- acc
|
||||
_ <-
|
||||
chainConfig.chainCallbacks
|
||||
.executeOnBlockHeaderConnectedCallbacks(logger,
|
||||
header.height,
|
||||
header.blockHeader)
|
||||
} yield ()
|
||||
}
|
||||
}
|
||||
chains.foreach { c =>
|
||||
|
@ -136,6 +166,17 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def processHeaders(
|
||||
headers: Vector[BlockHeader]): Future[ChainApi] = {
|
||||
val blockchainsF = blockHeaderDAO.getBlockchains()
|
||||
for {
|
||||
blockchains <- blockchainsF
|
||||
newChainApi <-
|
||||
processHeadersWithChains(headers = headers, blockchains = blockchains)
|
||||
} yield newChainApi
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
|
@ -143,19 +184,19 @@ case class ChainHandler(
|
|||
getBestBlockHeader().map(_.hashBE)
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def nextBlockHeaderBatchRange(
|
||||
protected def nextBlockHeaderBatchRangeWithChains(
|
||||
prevStopHash: DoubleSha256DigestBE,
|
||||
batchSize: Int): Future[Option[FilterSyncMarker]] = {
|
||||
batchSize: Int,
|
||||
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = {
|
||||
for {
|
||||
prevBlockHeaderOpt <- getHeader(prevStopHash)
|
||||
headerOpt <- prevBlockHeaderOpt match {
|
||||
case Some(_) =>
|
||||
findNextHeader(prevBlockHeaderOpt, batchSize)
|
||||
findNextHeader(prevBlockHeaderOpt, batchSize, blockchains)
|
||||
case None =>
|
||||
if (prevStopHash == DoubleSha256DigestBE.empty) {
|
||||
for {
|
||||
next <- findNextHeader(None, batchSize)
|
||||
next <- findNextHeader(None, batchSize, blockchains)
|
||||
} yield next
|
||||
} else {
|
||||
Future.successful(None)
|
||||
|
@ -166,12 +207,26 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def nextBlockHeaderBatchRange(
|
||||
prevStopHash: DoubleSha256DigestBE,
|
||||
batchSize: Int): Future[Option[FilterSyncMarker]] = {
|
||||
val blockchainsF = blockHeaderDAO.getBlockchains()
|
||||
for {
|
||||
blockchains <- blockchainsF
|
||||
syncMarkerOpt <- nextBlockHeaderBatchRangeWithChains(prevStopHash,
|
||||
batchSize,
|
||||
blockchains)
|
||||
} yield syncMarkerOpt
|
||||
}
|
||||
|
||||
/** Finds the next header in the chain. Uses chain work to break ties
|
||||
* returning only the header in the chain with the most work
|
||||
*/
|
||||
private def findNextHeader(
|
||||
prevBlockHeaderOpt: Option[BlockHeaderDb],
|
||||
batchSize: Int): Future[Option[FilterSyncMarker]] = {
|
||||
batchSize: Int,
|
||||
blockchains: Vector[Blockchain]): Future[Option[FilterSyncMarker]] = {
|
||||
|
||||
val chainsF = prevBlockHeaderOpt match {
|
||||
case None =>
|
||||
|
@ -420,7 +475,6 @@ case class ChainHandler(
|
|||
override def processCheckpoints(
|
||||
checkpoints: Vector[DoubleSha256DigestBE],
|
||||
blockHash: DoubleSha256DigestBE): Future[ChainApi] = {
|
||||
|
||||
val blockHeadersF: Future[Seq[BlockHeaderDb]] = Future
|
||||
.traverse(checkpoints.indices.toVector) { i =>
|
||||
blockHeaderDAO.getAtHeight(i * 1000)
|
||||
|
@ -438,9 +492,11 @@ case class ChainHandler(
|
|||
res.updated(blockHeader.hashBE, filterHeaderHash)
|
||||
}
|
||||
|
||||
this.copy(blockFilterCheckpoints = updatedCheckpoints)
|
||||
ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockFilterCheckpoints = updatedCheckpoints)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
|
@ -471,8 +527,9 @@ case class ChainHandler(
|
|||
height: Int): Future[Vector[CompactFilterHeaderDb]] =
|
||||
filterHeaderDAO.getAtHeight(height)
|
||||
|
||||
/** @inheritdoc */
|
||||
override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = {
|
||||
protected def getBestFilterHeaderWithChains(
|
||||
blockchains: Vector[Blockchain]): Future[
|
||||
Option[CompactFilterHeaderDb]] = {
|
||||
val bestFilterHeadersInChain: Future[Option[CompactFilterHeaderDb]] = {
|
||||
val bestChainOpt = blockchains.maxByOption(_.tip.chainWork)
|
||||
bestChainOpt match {
|
||||
|
@ -495,6 +552,17 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = {
|
||||
val blockchainsF = blockHeaderDAO.getBlockchains()
|
||||
for {
|
||||
blockchains <- blockchainsF
|
||||
filterHeaderOpt <- getBestFilterHeaderWithChains(blockchains)
|
||||
} yield {
|
||||
filterHeaderOpt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method retrieves the best [[CompactFilterHeaderDb]] from the database
|
||||
* without any blockchain context, and then uses the [[CompactFilterHeaderDb.blockHashBE]]
|
||||
|
@ -830,10 +898,14 @@ case class ChainHandler(
|
|||
maxHeight <- maxHeightF
|
||||
start <- startF
|
||||
_ <- runRecalculateChainWork(maxHeight, start.head)
|
||||
newBlockchains <- blockHeaderDAO.getBlockchains()
|
||||
} yield {
|
||||
logger.info("Finished calculating chain work")
|
||||
this.copy(blockchains = newBlockchains)
|
||||
ChainHandler(
|
||||
blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockFilterCheckpoints = blockFilterCheckpoints
|
||||
)
|
||||
}
|
||||
|
||||
resultF.failed.foreach { err =>
|
||||
|
@ -850,10 +922,40 @@ case class ChainHandler(
|
|||
val genesisWithWork = genesisHeader.copy(chainWork = expectedWork)
|
||||
blockHeaderDAO.update(genesisWithWork)
|
||||
}
|
||||
|
||||
def copyWith(
|
||||
blockHeaderDAO: BlockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO = filterDAO,
|
||||
blockFilterCheckpoints: Map[DoubleSha256DigestBE, DoubleSha256DigestBE] =
|
||||
blockFilterCheckpoints): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockFilterCheckpoints = blockFilterCheckpoints)
|
||||
}
|
||||
|
||||
def toChainHandlerCached: Future[ChainHandlerCached] = {
|
||||
ChainHandler.toChainHandlerCached(this)
|
||||
}
|
||||
}
|
||||
|
||||
object ChainHandler {
|
||||
|
||||
def apply(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO,
|
||||
blockFilterCheckpoints: Map[DoubleSha256DigestBE, DoubleSha256DigestBE])(
|
||||
implicit
|
||||
ec: ExecutionContext,
|
||||
chainAppConfig: ChainAppConfig): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
blockFilterCheckpoints)
|
||||
}
|
||||
|
||||
/** Constructs a [[ChainHandler chain handler]] from the state in the database
|
||||
* This gives us the guaranteed latest state we have in the database
|
||||
*/
|
||||
|
@ -862,28 +964,38 @@ object ChainHandler {
|
|||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): Future[ChainHandler] = {
|
||||
val bestChainsF = blockHeaderDAO.getBlockchains()
|
||||
|
||||
bestChainsF.map(chains =>
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockchains = chains,
|
||||
blockFilterCheckpoints = Map.empty))
|
||||
chainConfig: ChainAppConfig): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
}
|
||||
|
||||
def apply(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO,
|
||||
blockchains: Blockchain)(implicit
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): ChainHandler = {
|
||||
new ChainHandler(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockchains = Vector(blockchains),
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
}
|
||||
|
||||
/** Converts a [[ChainHandler]] to [[ChainHandlerCached]] by calling [[BlockHeaderDAO.getBlockchains()]] */
|
||||
def toChainHandlerCached(chainHandler: ChainHandler)(implicit
|
||||
ec: ExecutionContext): Future[ChainHandlerCached] = {
|
||||
val blockchainsF = chainHandler.blockHeaderDAO.getBlockchains()
|
||||
for {
|
||||
blockchains <- blockchainsF
|
||||
cached = ChainHandlerCached(
|
||||
blockHeaderDAO = chainHandler.blockHeaderDAO,
|
||||
filterHeaderDAO = chainHandler.filterHeaderDAO,
|
||||
filterDAO = chainHandler.filterDAO,
|
||||
blockchains = blockchains,
|
||||
blockFilterCheckpoints = chainHandler.blockFilterCheckpoints
|
||||
)(chainHandler.chainConfig, ec)
|
||||
} yield cached
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
CompactFilterDAO,
|
||||
CompactFilterHeaderDAO
|
||||
}
|
||||
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
|
||||
import org.bitcoins.core.api.chain.db.{BlockHeaderDb, CompactFilterHeaderDb}
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.crypto.DoubleSha256DigestBE
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
/** An optimized version of [[ChainHandler]] that avoids database reads
|
||||
* for determining what the best block header is. This should be used
|
||||
* with care as it is possible the cached [[blockchains]] may be out of date!
|
||||
* Unless you know what you are doing, you should probably use [[ChainHandler]]
|
||||
*/
|
||||
case class ChainHandlerCached(
|
||||
override val blockHeaderDAO: BlockHeaderDAO,
|
||||
override val filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
override val filterDAO: CompactFilterDAO,
|
||||
blockchains: Vector[Blockchain],
|
||||
override val blockFilterCheckpoints: Map[
|
||||
DoubleSha256DigestBE,
|
||||
DoubleSha256DigestBE])(implicit
|
||||
override val chainConfig: ChainAppConfig,
|
||||
executionContext: ExecutionContext)
|
||||
extends ChainHandler(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO,
|
||||
blockFilterCheckpoints) {
|
||||
|
||||
/** Gets the best block header from the given [[blockchains]] parameter */
|
||||
override def getBestBlockHeader(): Future[BlockHeaderDb] = {
|
||||
Future {
|
||||
getBestBlockHeaderHelper(blockchains)
|
||||
}
|
||||
}
|
||||
|
||||
override def processHeaders(
|
||||
headers: Vector[BlockHeader]): Future[ChainApi] = {
|
||||
processHeadersWithChains(headers = headers, blockchains = blockchains)
|
||||
}
|
||||
|
||||
override def getBestFilterHeader(): Future[Option[CompactFilterHeaderDb]] = {
|
||||
getBestFilterHeaderWithChains(blockchains)
|
||||
}
|
||||
|
||||
override def nextBlockHeaderBatchRange(
|
||||
prevStopHash: DoubleSha256DigestBE,
|
||||
batchSize: Int): Future[Option[FilterSyncMarker]] = {
|
||||
nextBlockHeaderBatchRangeWithChains(prevStopHash = prevStopHash,
|
||||
batchSize = batchSize,
|
||||
blockchains = blockchains)
|
||||
}
|
||||
}
|
||||
|
||||
object ChainHandlerCached {
|
||||
|
||||
def fromDatabase(
|
||||
blockHeaderDAO: BlockHeaderDAO,
|
||||
filterHeaderDAO: CompactFilterHeaderDAO,
|
||||
filterDAO: CompactFilterDAO)(implicit
|
||||
ec: ExecutionContext,
|
||||
chainConfig: ChainAppConfig): Future[ChainHandlerCached] = {
|
||||
val bestChainsF = blockHeaderDAO.getBlockchains()
|
||||
|
||||
bestChainsF.map(chains =>
|
||||
new ChainHandlerCached(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO,
|
||||
blockchains = chains,
|
||||
blockFilterCheckpoints = Map.empty))
|
||||
}
|
||||
}
|
|
@ -13,5 +13,5 @@ case class FilterSyncMarker(
|
|||
stopBlockHash: DoubleSha256Digest) {
|
||||
|
||||
override def toString: String =
|
||||
s"FilterSyncMarker(startHeight = $stopBlockHash, stopBlockHash=${stopBlockHash.flip.hex})"
|
||||
s"FilterSyncMarker(startHeight=$startHeight, stopBlockHash=${stopBlockHash.flip.hex})"
|
||||
}
|
||||
|
|
|
@ -228,7 +228,7 @@ object BitcoinNetworks extends StringFactory[BitcoinNetwork] {
|
|||
}
|
||||
|
||||
/** Map of magic network bytes to the corresponding network */
|
||||
def magicToNetwork: Map[ByteVector, NetworkParameters] =
|
||||
val magicToNetwork: Map[ByteVector, NetworkParameters] =
|
||||
Map(
|
||||
MainNet.magicBytes -> MainNet,
|
||||
TestNet3.magicBytes -> TestNet3,
|
||||
|
|
|
@ -121,7 +121,7 @@ sealed trait BlockHeader extends NetworkElement {
|
|||
*/
|
||||
lazy val hashBE: DoubleSha256DigestBE = hash.flip
|
||||
|
||||
override def bytes: ByteVector = RawBlockHeaderSerializer.write(this)
|
||||
override lazy val bytes: ByteVector = RawBlockHeaderSerializer.write(this)
|
||||
|
||||
override def toString: String = {
|
||||
s"BlockHeader(hashBE=${hashBE.hex},version=$version," +
|
||||
|
|
|
@ -164,6 +164,7 @@ abstract class AppConfig extends StartStopAsync[Unit] with BitcoinSLogger {
|
|||
val c = DatabaseConfig.forConfig[JdbcProfile](path =
|
||||
s"bitcoin-s.$moduleName",
|
||||
config = config)
|
||||
logger.debug(s"Resolved DB config: ${ConfigOps(c.config).asReadableJson}")
|
||||
c
|
||||
} match {
|
||||
case Success(value) =>
|
||||
|
|
|
@ -22,9 +22,6 @@ trait JdbcProfileComponent[+ConfigType <: AppConfig] extends BitcoinSLogger {
|
|||
slickDbConfig
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
s"Resolved DB config: ${appConfig.slickDbConfig.config.asReadableJson}")
|
||||
|
||||
lazy val profile: JdbcProfile = dbConfig.profile
|
||||
import profile.api._
|
||||
|
||||
|
|
|
@ -70,17 +70,16 @@ val compactFilterDAO = CompactFilterDAO()
|
|||
|
||||
|
||||
//initialize the chain handler from the database
|
||||
val chainHandlerF = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO)
|
||||
val chainHandler = ChainHandler.fromDatabase(blockHeaderDAO, compactFilterHeaderDAO, compactFilterDAO)
|
||||
|
||||
// Now, do the actual syncing:
|
||||
val syncedChainApiF = for {
|
||||
_ <- chainProjectInitF
|
||||
handler <- chainHandlerF
|
||||
synced <- ChainSync.sync(handler, getBlockHeader, getBestBlockHash)
|
||||
synced <- ChainSync.sync(chainHandler, getBlockHeader, getBestBlockHash)
|
||||
} yield synced
|
||||
|
||||
val syncResultF = syncedChainApiF.flatMap { chainApi =>
|
||||
chainApi.getBlockCount.map(count => println(s"chain api blockcount=${count}"))
|
||||
chainApi.getBlockCount().map(count => println(s"chain api blockcount=${count}"))
|
||||
|
||||
rpcCli.getBlockCount.map(count => println(s"bitcoind blockcount=${count}"))
|
||||
}
|
||||
|
|
|
@ -125,7 +125,6 @@ val syncF: Future[ChainApi] = configF.flatMap { _ =>
|
|||
blockHeaderDAO,
|
||||
compactFilterHeaderDAO,
|
||||
compactFilterDAO,
|
||||
blockchains = Vector.empty,
|
||||
blockFilterCheckpoints = Map.empty)
|
||||
|
||||
ChainSync.sync(chainHandler, getBlockHeaderFunc, getBestBlockHashFunc)
|
||||
|
@ -174,7 +173,7 @@ val balanceF: Future[CurrencyUnit] = for {
|
|||
wallet <- walletF
|
||||
(tx, blockhash) <- transactionF
|
||||
_ <- wallet.processTransaction(tx, blockhash)
|
||||
balance <- wallet.getBalance
|
||||
balance <- wallet.getBalance()
|
||||
} yield balance
|
||||
|
||||
balanceF.foreach { balance =>
|
||||
|
|
|
@ -125,31 +125,31 @@ class NeutrinoNodeTest extends NodeUnitTest {
|
|||
val ExpectedCount = 113
|
||||
|
||||
def hasBlocksF =
|
||||
RpcUtil.retryUntilSatisfiedF(
|
||||
conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getBlockCount.map(_ == ExpectedCount))
|
||||
},
|
||||
interval = 1000.millis)
|
||||
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getBlockCount())
|
||||
.map(_ == ExpectedCount)
|
||||
},
|
||||
interval = 1000.millis)
|
||||
|
||||
def hasFilterHeadersF =
|
||||
RpcUtil.retryUntilSatisfiedF(
|
||||
conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getFilterHeaderCount.map(_ == ExpectedCount))
|
||||
},
|
||||
interval = 1000.millis)
|
||||
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getFilterHeaderCount)
|
||||
.map(_ == ExpectedCount)
|
||||
},
|
||||
interval = 1000.millis)
|
||||
|
||||
def hasFiltersF =
|
||||
RpcUtil.retryUntilSatisfiedF(
|
||||
conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getFilterCount.map(_ == ExpectedCount))
|
||||
},
|
||||
interval = 1000.millis)
|
||||
RpcUtil.retryUntilSatisfiedF(conditionF = () => {
|
||||
node
|
||||
.chainApiFromDb()
|
||||
.flatMap(_.getFilterCount)
|
||||
.map(_ == ExpectedCount)
|
||||
},
|
||||
interval = 1000.millis)
|
||||
|
||||
for {
|
||||
_ <- hasBlocksF
|
||||
|
|
|
@ -3,6 +3,7 @@ package org.bitcoins.node
|
|||
import akka.Done
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.BlockHeaderDAO
|
||||
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
|
||||
import org.bitcoins.core.protocol.BlockStamp
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
|
@ -54,16 +55,18 @@ case class NeutrinoNode(
|
|||
* @return
|
||||
*/
|
||||
override def sync(): Future[Unit] = {
|
||||
val blockchainsF =
|
||||
BlockHeaderDAO()(executionContext, chainConfig).getBlockchains()
|
||||
for {
|
||||
chainApi <- chainApiFromDb()
|
||||
header <- chainApi.getBestBlockHeader()
|
||||
filterHeaderCount <- chainApi.getFilterHeaderCount()
|
||||
filterCount <- chainApi.getFilterCount()
|
||||
peerMsgSender <- peerMsgSenderF
|
||||
blockchains <- blockchainsF
|
||||
} yield {
|
||||
// Get all of our cached headers in case of a reorg
|
||||
val cachedHeaders =
|
||||
chainApi.blockchains.flatMap(_.headers).map(_.hashBE.flip)
|
||||
val cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
|
||||
peerMsgSender.sendGetHeadersMessage(cachedHeaders)
|
||||
|
||||
// If we have started syncing filters headers
|
||||
|
|
|
@ -2,7 +2,7 @@ package org.bitcoins.node
|
|||
|
||||
import akka.Done
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.blockchain.ChainHandlerCached
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
|
@ -58,10 +58,10 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
|
|||
* our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
|
||||
*/
|
||||
def chainApiFromDb()(implicit
|
||||
executionContext: ExecutionContext): Future[ChainHandler] = {
|
||||
ChainHandler.fromDatabase(BlockHeaderDAO(),
|
||||
CompactFilterHeaderDAO(),
|
||||
CompactFilterDAO())
|
||||
executionContext: ExecutionContext): Future[ChainHandlerCached] = {
|
||||
ChainHandlerCached.fromDatabase(BlockHeaderDAO(),
|
||||
CompactFilterHeaderDAO(),
|
||||
CompactFilterDAO())
|
||||
}
|
||||
|
||||
/** Unlike our chain api, this is cached inside our node
|
||||
|
@ -69,8 +69,9 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
|
|||
* the [[ChainApi chain api]] is updated inside of the p2p client
|
||||
*/
|
||||
lazy val clientF: Future[P2PClient] = {
|
||||
val chainApiF = chainApiFromDb()
|
||||
for {
|
||||
chainApi <- chainApiFromDb()
|
||||
chainApi <- chainApiF
|
||||
} yield {
|
||||
val peerMsgRecv: PeerMessageReceiver =
|
||||
PeerMessageReceiver.newReceiver(chainApi = chainApi,
|
||||
|
@ -188,13 +189,16 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
|
|||
* @return
|
||||
*/
|
||||
def sync(): Future[Unit] = {
|
||||
val blockchainsF =
|
||||
BlockHeaderDAO()(executionContext, chainAppConfig).getBlockchains()
|
||||
for {
|
||||
chainApi <- chainApiFromDb()
|
||||
header <- chainApi.getBestBlockHeader()
|
||||
blockchains <- blockchainsF
|
||||
} yield {
|
||||
// Get all of our cached headers in case of a reorg
|
||||
val cachedHeaders =
|
||||
chainApi.blockchains.flatMap(_.headers).map(_.hashBE.flip)
|
||||
blockchains.flatMap(_.headers).map(_.hashBE.flip)
|
||||
peerMsgSenderF.map(_.sendGetHeadersMessage(cachedHeaders))
|
||||
logger.info(
|
||||
s"Starting sync node, height=${header.height} hash=${header.hashBE}")
|
||||
|
|
|
@ -2,7 +2,7 @@ package org.bitcoins.node.networking.peer
|
|||
|
||||
import akka.Done
|
||||
import akka.actor.ActorRefFactory
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.blockchain.ChainHandlerCached
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{
|
||||
BlockHeaderDAO,
|
||||
|
@ -291,7 +291,9 @@ object PeerMessageReceiver {
|
|||
val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
val filterDAO = CompactFilterDAO()
|
||||
val chainHandlerF =
|
||||
ChainHandler.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO)
|
||||
ChainHandlerCached.fromDatabase(blockHeaderDAO,
|
||||
filterHeaderDAO,
|
||||
filterDAO)
|
||||
for {
|
||||
chainHandler <- chainHandlerF
|
||||
} yield {
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.net.InetSocketAddress
|
|||
import akka.actor.ActorSystem
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.bitcoins.chain.ChainVerificationLogger
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.blockchain.{ChainHandler, ChainHandlerCached}
|
||||
import org.bitcoins.chain.blockchain.sync.ChainSync
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models._
|
||||
|
@ -178,21 +178,32 @@ trait ChainUnitTest
|
|||
() => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
def withChainHandlerCached(test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(() => ChainUnitTest.createChainHandlerCached(),
|
||||
() => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
def withChainHandlerGenesisFilter(test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(() => createChainHandlerWithGenesisFilter(),
|
||||
() => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
def withChainHandlerCachedGenesisFilter(
|
||||
test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(build = () => createChainHandlerCachedWithGenesisFilter(),
|
||||
destroy = () => ChainUnitTest.destroyAllTables())(test)
|
||||
}
|
||||
|
||||
/** Creates and populates BlockHeaderTable with block headers 562375 to 571375 */
|
||||
def createPopulatedChainHandler(): Future[ChainHandler] = {
|
||||
for {
|
||||
blockHeaderDAO <- ChainUnitTest.createPopulatedBlockHeaderDAO()
|
||||
filterHeaderDAO <- ChainUnitTest.createPopulatedFilterHeaderDAO()
|
||||
filterDAO <- ChainUnitTest.createPopulatedFilterDAO()
|
||||
chainHandler <- ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO =
|
||||
filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
chainHandler = ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO =
|
||||
filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
} yield chainHandler
|
||||
}
|
||||
|
||||
|
@ -207,6 +218,18 @@ trait ChainUnitTest
|
|||
} yield filterChainApi.asInstanceOf[ChainHandler]
|
||||
}
|
||||
|
||||
def createChainHandlerCachedWithGenesisFilter(): Future[
|
||||
ChainHandlerCached] = {
|
||||
for {
|
||||
chainHandler <- createChainHandler()
|
||||
filterHeaderChainApi <- chainHandler.processFilterHeader(
|
||||
ChainUnitTest.genesisFilterHeaderDb.filterHeader,
|
||||
ChainUnitTest.genesisHeaderDb.hashBE)
|
||||
filterChainApi <-
|
||||
filterHeaderChainApi.processFilter(ChainUnitTest.genesisFilterMessage)
|
||||
} yield filterChainApi.asInstanceOf[ChainHandlerCached]
|
||||
}
|
||||
|
||||
def withPopulatedChainHandler(test: OneArgAsyncTest): FutureOutcome = {
|
||||
makeFixture(() => createPopulatedChainHandler(),
|
||||
() => ChainUnitTest.destroyAllTables())(test)
|
||||
|
@ -413,6 +436,12 @@ object ChainUnitTest extends ChainVerificationLogger {
|
|||
def createChainHandler()(implicit
|
||||
ec: ExecutionContext,
|
||||
appConfig: ChainAppConfig): Future[ChainHandler] = {
|
||||
createChainHandlerCached()
|
||||
}
|
||||
|
||||
def createChainHandlerCached()(implicit
|
||||
ec: ExecutionContext,
|
||||
appConfig: ChainAppConfig): Future[ChainHandlerCached] = {
|
||||
val handlerWithGenesisHeaderF =
|
||||
ChainUnitTest.setupHeaderTableWithGenesisHeader()
|
||||
|
||||
|
@ -603,7 +632,8 @@ object ChainUnitTest extends ChainVerificationLogger {
|
|||
/** Creates the [[org.bitcoins.chain.models.BlockHeaderTable]] and inserts the genesis header */
|
||||
def setupHeaderTableWithGenesisHeader()(implicit
|
||||
ec: ExecutionContext,
|
||||
appConfig: ChainAppConfig): Future[(ChainHandler, BlockHeaderDb)] = {
|
||||
appConfig: ChainAppConfig): Future[
|
||||
(ChainHandlerCached, BlockHeaderDb)] = {
|
||||
val tableSetupF = setupAllTables()
|
||||
|
||||
val genesisHeaderF = tableSetupF.flatMap { _ =>
|
||||
|
@ -622,14 +652,14 @@ object ChainUnitTest extends ChainVerificationLogger {
|
|||
|
||||
def makeChainHandler()(implicit
|
||||
appConfig: ChainAppConfig,
|
||||
ec: ExecutionContext): Future[ChainHandler] = {
|
||||
ec: ExecutionContext): Future[ChainHandlerCached] = {
|
||||
lazy val blockHeaderDAO = BlockHeaderDAO()
|
||||
lazy val filterHeaderDAO = CompactFilterHeaderDAO()
|
||||
lazy val filterDAO = CompactFilterDAO()
|
||||
|
||||
ChainHandler.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
ChainHandlerCached.fromDatabase(blockHeaderDAO = blockHeaderDAO,
|
||||
filterHeaderDAO = filterHeaderDAO,
|
||||
filterDAO = filterDAO)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package org.bitcoins.testkit.chain.fixture
|
||||
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.blockchain.{ChainHandler, ChainHandlerCached}
|
||||
import org.bitcoins.chain.models.BlockHeaderDAO
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,11 @@ object ChainFixture {
|
|||
case class GenesisChainHandlerWithGenesisFilters(chainHandler: ChainHandler)
|
||||
extends ChainFixture
|
||||
|
||||
/** Genesis chain handler with the genesis block header cached in memory */
|
||||
case class GenesisChainHandlerCachedWithGenesisFilters(
|
||||
chainHandler: ChainHandlerCached)
|
||||
extends ChainFixture
|
||||
|
||||
case class PopulatedChainHandler(chainHandler: ChainHandler)
|
||||
extends ChainFixture
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package org.bitcoins.testkit.chain.fixture
|
||||
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.testkit.chain.ChainUnitTest
|
||||
import org.bitcoins.testkit.chain.fixture.ChainFixture.{
|
||||
BitcoindZmqChainHandlerWithBlock,
|
||||
Empty,
|
||||
GenesisChainHandlerCachedWithGenesisFilters,
|
||||
GenesisChainHandlerWithGenesisFilters,
|
||||
GenisisBlockHeaderDAO,
|
||||
GenisisChainHandler,
|
||||
|
@ -33,6 +35,9 @@ trait ChainFixtureHelper { this: ChainUnitTest =>
|
|||
case ChainFixtureTag.GenesisChainHandlerWithFilter =>
|
||||
createChainHandlerWithGenesisFilter()
|
||||
.map(ChainFixture.GenesisChainHandlerWithGenesisFilters(_))
|
||||
case ChainFixtureTag.GenesisChainHandlerCachedWithFilter =>
|
||||
createChainHandlerCachedWithGenesisFilter()
|
||||
.map(ChainFixture.GenesisChainHandlerCachedWithGenesisFilters(_))
|
||||
case ChainFixtureTag.BitcoindZmqChainHandlerWithBlock =>
|
||||
createBitcoindChainHandlerViaZmq().map(
|
||||
BitcoindZmqChainHandlerWithBlock.apply)
|
||||
|
@ -41,12 +46,14 @@ trait ChainFixtureHelper { this: ChainUnitTest =>
|
|||
|
||||
def destroyFixture(fixture: ChainFixture): Future[Any] = {
|
||||
fixture match {
|
||||
case Empty => Future.successful(())
|
||||
case Empty => FutureUtil.unit
|
||||
case GenisisBlockHeaderDAO(_) => ChainUnitTest.destroyAllTables()
|
||||
case PopulatedBlockHeaderDAO(_) => ChainUnitTest.destroyAllTables()
|
||||
case GenisisChainHandler(_) => ChainUnitTest.destroyAllTables()
|
||||
case GenesisChainHandlerWithGenesisFilters(_) =>
|
||||
ChainUnitTest.destroyAllTables()
|
||||
case GenesisChainHandlerCachedWithGenesisFilters(_) =>
|
||||
ChainUnitTest.destroyAllTables()
|
||||
case PopulatedChainHandler(_) => ChainUnitTest.destroyAllTables()
|
||||
case BitcoindZmqChainHandlerWithBlock(bitcoindHandler) =>
|
||||
destroyBitcoindChainHandlerViaZmq(bitcoindHandler)
|
||||
|
|
|
@ -27,6 +27,9 @@ object ChainFixtureTag {
|
|||
case object GenesisChainHandlerWithFilter
|
||||
extends ChainFixtureTag("GenesisChainHandlerWithFilter")
|
||||
|
||||
case object GenesisChainHandlerCachedWithFilter
|
||||
extends ChainFixtureTag("GenesisChainHandlerCachedWithFilter")
|
||||
|
||||
case object PopulatedChainHandler
|
||||
extends ChainFixtureTag("PopulatedChainHandler")
|
||||
|
||||
|
|
|
@ -122,7 +122,8 @@ abstract class NodeTestUtil extends P2PLogger {
|
|||
implicit ec: ExecutionContext): Future[Boolean] = {
|
||||
val rpcCountF = rpc.getBlockCount
|
||||
for {
|
||||
filterCount <- node.chainApiFromDb().flatMap(_.getFilterCount())
|
||||
chainApi <- node.chainApiFromDb()
|
||||
filterCount <- chainApi.getFilterCount()
|
||||
blockCount <- rpcCountF
|
||||
} yield {
|
||||
blockCount == filterCount
|
||||
|
@ -133,8 +134,8 @@ abstract class NodeTestUtil extends P2PLogger {
|
|||
implicit ec: ExecutionContext): Future[Boolean] = {
|
||||
val rpcCountF = rpc.getBlockCount
|
||||
for {
|
||||
filterHeaderCount <-
|
||||
node.chainApiFromDb().flatMap(_.getFilterHeaderCount())
|
||||
chainApi <- node.chainApiFromDb()
|
||||
filterHeaderCount <- chainApi.getFilterHeaderCount()
|
||||
blockCount <- rpcCountF
|
||||
} yield {
|
||||
blockCount == filterHeaderCount
|
||||
|
@ -148,7 +149,8 @@ abstract class NodeTestUtil extends P2PLogger {
|
|||
ec: ExecutionContext): Future[Boolean] = {
|
||||
val rpcCountF = rpc.getBlockCount
|
||||
for {
|
||||
count <- node.chainApiFromDb().flatMap(_.getBlockCount())
|
||||
chainApi <- node.chainApiFromDb()
|
||||
count <- chainApi.getBlockCount()
|
||||
rpcCount <- rpcCountF
|
||||
} yield {
|
||||
rpcCount == count
|
||||
|
|
Loading…
Add table
Reference in a new issue