mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-13 19:37:30 +01:00
2019 08 16 process header optimization (#701)
* WIP: implement processHeaders() in ChainHandler instead of processHeader() * Create ConnectTipResult which is returned from Blockchain.connectTip(), this decouples 'BlockchainUpdate' and Blockchain.connectTip(). This was needed because it doesn't make sense to have a Vector[BlockHeaderDb] returned from Blockchain.connectTip() as we are validating one header. However, to execute batch inserts, we need to accumulate headers somewhere. I choose to do this in BlockchainUpdate with the 'successfulHeaders' field. * make chain-verification log level INFO * Add unit test for benchmarking ChainHandler.processHeaders() that cuts out extraneous noise in another test case * Address code review
This commit is contained in:
parent
f27edd5b16
commit
ab170d05f9
8 changed files with 325 additions and 193 deletions
|
@ -25,14 +25,14 @@ class BlockchainTest extends ChainUnitTest {
|
|||
val newHeader =
|
||||
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)
|
||||
|
||||
val connectTip = Blockchain.connectTip(header = newHeader.blockHeader,
|
||||
Vector(blockchain))
|
||||
val connectTip =
|
||||
Blockchain.connectTip(header = newHeader.blockHeader, blockchain)
|
||||
|
||||
connectTip match {
|
||||
case BlockchainUpdate.Successful(_, connectedHeader) =>
|
||||
assert(newHeader == connectedHeader)
|
||||
case ConnectTipResult.ExtendChain(_, newChain) =>
|
||||
assert(newHeader == newChain.tip)
|
||||
|
||||
case fail: BlockchainUpdate.Failed =>
|
||||
case fail @ (_: ConnectTipResult.Reorg | _: ConnectTipResult.BadTip) =>
|
||||
assert(false)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,13 @@ class ChainHandlerTest extends ChainUnitTest {
|
|||
mainnetAppConfig.withOverrides(memoryDb)
|
||||
}
|
||||
|
||||
val source = FileUtil.getFileAsSource("block_headers.json")
|
||||
val arrStr = source.getLines.next
|
||||
source.close()
|
||||
|
||||
import org.bitcoins.rpc.serializers.JsonReaders.BlockHeaderReads
|
||||
val headersResult = Json.parse(arrStr).validate[Vector[BlockHeader]].get
|
||||
|
||||
override val defaultTag: ChainFixtureTag = ChainFixtureTag.GenisisChainHandler
|
||||
|
||||
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
|
||||
|
@ -83,18 +90,8 @@ class ChainHandlerTest extends ChainUnitTest {
|
|||
|
||||
it must "be able to process and fetch real headers from mainnet" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val source = FileUtil.getFileAsSource("block_headers.json")
|
||||
val arrStr = source.getLines.next
|
||||
source.close()
|
||||
|
||||
import org.bitcoins.rpc.serializers.JsonReaders.BlockHeaderReads
|
||||
val headersResult = Json.parse(arrStr).validate[Vector[BlockHeader]]
|
||||
if (headersResult.isError) {
|
||||
fail(headersResult.toString)
|
||||
}
|
||||
|
||||
val blockHeaders =
|
||||
headersResult.get.drop(
|
||||
headersResult.drop(
|
||||
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)
|
||||
|
||||
val firstBlockHeaderDb =
|
||||
|
@ -135,6 +132,52 @@ class ChainHandlerTest extends ChainUnitTest {
|
|||
}
|
||||
}
|
||||
|
||||
it must "benchmark ChainHandler.processHeaders()" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val blockHeaders =
|
||||
headersResult.drop(
|
||||
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)
|
||||
|
||||
val firstBlockHeaderDb =
|
||||
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE - 2,
|
||||
ChainTestUtil.blockHeader562462)
|
||||
|
||||
val secondBlockHeaderDb =
|
||||
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE - 1,
|
||||
ChainTestUtil.blockHeader562463)
|
||||
|
||||
val thirdBlockHeaderDb =
|
||||
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE,
|
||||
ChainTestUtil.blockHeader562464)
|
||||
|
||||
/*
|
||||
* We need to insert one block before the first POW check because it is used on the next
|
||||
* POW check. We then need to insert the next to blocks to circumvent a POW check since
|
||||
* that would require we have an old block in the Blockchain that we don't have.
|
||||
*/
|
||||
val firstThreeBlocks =
|
||||
Vector(firstBlockHeaderDb, secondBlockHeaderDb, thirdBlockHeaderDb)
|
||||
|
||||
val createdF = chainHandler.blockHeaderDAO.createAll(firstThreeBlocks)
|
||||
|
||||
createdF.flatMap { _ =>
|
||||
val blockchain = Blockchain.fromHeaders(firstThreeBlocks.reverse)
|
||||
val handler = ChainHandler(chainHandler.blockHeaderDAO, blockchain)
|
||||
|
||||
// Takes way too long to do all blocks
|
||||
val blockHeadersToTest = blockHeaders.tail
|
||||
.take(
|
||||
(2 * chainHandler.chainConfig.chain.difficultyChangeInterval + 1))
|
||||
|
||||
val processedF = handler.processHeaders(blockHeadersToTest)
|
||||
|
||||
for {
|
||||
ch <- processedF
|
||||
bestHash <- ch.getBestBlockHash
|
||||
} yield assert(bestHash == blockHeadersToTest.last.hashBE)
|
||||
}
|
||||
}
|
||||
|
||||
it must "handle a very basic reorg where one chain is one block behind the best chain" in {
|
||||
chainHandler: ChainHandler =>
|
||||
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
|
||||
|
@ -214,7 +257,7 @@ class ChainHandlerTest extends ChainUnitTest {
|
|||
}
|
||||
|
||||
final def processHeaders(
|
||||
processorF: Future[ChainHandler],
|
||||
processorF: Future[ChainApi],
|
||||
remainingHeaders: List[BlockHeader],
|
||||
height: Int): Future[Assertion] = {
|
||||
remainingHeaders match {
|
||||
|
|
|
@ -20,7 +20,9 @@ trait ChainApi {
|
|||
* @return
|
||||
*/
|
||||
def processHeader(header: BlockHeader)(
|
||||
implicit ec: ExecutionContext): Future[ChainApi]
|
||||
implicit ec: ExecutionContext): Future[ChainApi] = {
|
||||
processHeaders(Vector(header))
|
||||
}
|
||||
|
||||
/** Process all of the given headers and returns a new [[ChainApi chain api]]
|
||||
* that contains these headers. This method processes headers in the order
|
||||
|
@ -29,12 +31,7 @@ trait ChainApi {
|
|||
* @return
|
||||
*/
|
||||
def processHeaders(headers: Vector[BlockHeader])(
|
||||
implicit ec: ExecutionContext): Future[ChainApi] = {
|
||||
headers.foldLeft(Future.successful(this)) {
|
||||
case (chainF, header) =>
|
||||
chainF.flatMap(_.processHeader(header))
|
||||
}
|
||||
}
|
||||
implicit ec: ExecutionContext): Future[ChainApi]
|
||||
|
||||
/** Get's a [[org.bitcoins.chain.models.BlockHeaderDb]] from the chain's database */
|
||||
def getHeader(hash: DoubleSha256DigestBE)(
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.bitcoins.chain.validation.{TipUpdateResult, TipValidation}
|
|||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.chain.ChainVerificationLogger
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.{IndexedSeqLike, mutable}
|
||||
|
||||
/**
|
||||
|
@ -53,103 +54,154 @@ case class Blockchain(headers: Vector[BlockHeaderDb])
|
|||
}
|
||||
}
|
||||
|
||||
/** Unsafe version for [[org.bitcoins.chain.blockchain.Blockchain.fromHeader() fromHeader]] that can throw [[NoSuchElementException]] */
|
||||
def fromValidHeader(header: BlockHeaderDb): Blockchain = {
|
||||
fromHeader(header).get
|
||||
}
|
||||
|
||||
/** The height of the chain */
|
||||
def height: Int = tip.height
|
||||
|
||||
}
|
||||
|
||||
object Blockchain extends ChainVerificationLogger {
|
||||
|
||||
def fromHeaders(headers: Vector[BlockHeaderDb]): Blockchain = {
|
||||
|
||||
Blockchain(headers)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to connect the given block header with the given blockchain
|
||||
* This is done via the companion object for blockchain because
|
||||
* we query [[org.bitcoins.chain.models.BlockHeaderDAO BlockHeaderDAO]] for the chain tips
|
||||
* We then attempt to connect this block header to all of our current
|
||||
* chain tips.
|
||||
* @param header the block header to connect to our chain
|
||||
* @param blockchains the blockchain we are attempting to connect to
|
||||
* @return a [[scala.concurrent.Future Future]] that contains a [[org.bitcoins.chain.blockchain.BlockchainUpdate BlockchainUpdate]] indicating
|
||||
* we [[org.bitcoins.chain.blockchain.BlockchainUpdate.Successful successful]] connected the tip,
|
||||
* or [[org.bitcoins.chain.blockchain.BlockchainUpdate.Failed Failed]] to connect to a tip
|
||||
* @param blockchain the blockchain we are attempting to connect to
|
||||
*/
|
||||
def connectTip(header: BlockHeader, blockchains: Vector[Blockchain])(
|
||||
implicit conf: ChainAppConfig): BlockchainUpdate = {
|
||||
def connectTip(header: BlockHeader, blockchain: Blockchain)(
|
||||
implicit conf: ChainAppConfig): ConnectTipResult = {
|
||||
logger.debug(
|
||||
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
|
||||
|
||||
val tipResult: BlockchainUpdate = {
|
||||
val nested: Vector[BlockchainUpdate] = blockchains.map { blockchain =>
|
||||
val prevBlockHeaderIdxOpt =
|
||||
blockchain.headers.zipWithIndex.find {
|
||||
case (headerDb, _) =>
|
||||
headerDb.hashBE == header.previousBlockHashBE
|
||||
}
|
||||
prevBlockHeaderIdxOpt match {
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"No common ancestor found in the chain to connect to ${header.hashBE}")
|
||||
val err = TipUpdateResult.BadPreviousBlockHash(header)
|
||||
val failed = BlockchainUpdate.Failed(blockchain = blockchain,
|
||||
failedHeader = header,
|
||||
tipUpdateFailure = err)
|
||||
failed
|
||||
|
||||
case Some((prevBlockHeader, prevHeaderIdx)) =>
|
||||
//found a header to connect to!
|
||||
logger.debug(
|
||||
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
|
||||
val chain = blockchain.fromHeader(prevBlockHeader)
|
||||
val tipResult =
|
||||
TipValidation.checkNewTip(newPotentialTip = header, chain.get)
|
||||
|
||||
tipResult match {
|
||||
case TipUpdateResult.Success(headerDb) =>
|
||||
logger.debug(
|
||||
s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain")
|
||||
val oldChain =
|
||||
blockchain.takeRight(blockchain.length - prevHeaderIdx)
|
||||
val newChain =
|
||||
Blockchain.fromHeaders(headerDb +: oldChain)
|
||||
BlockchainUpdate.Successful(newChain, headerDb)
|
||||
case fail: TipUpdateResult.Failure =>
|
||||
logger.warn(
|
||||
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
|
||||
BlockchainUpdate.Failed(blockchain, header, fail)
|
||||
}
|
||||
val tipResult: ConnectTipResult = {
|
||||
val prevBlockHeaderIdxOpt =
|
||||
blockchain.headers.zipWithIndex.find {
|
||||
case (headerDb, _) =>
|
||||
headerDb.hashBE == header.previousBlockHashBE
|
||||
}
|
||||
}
|
||||
parseSuccessOrFailure(nested)
|
||||
}
|
||||
prevBlockHeaderIdxOpt match {
|
||||
case None =>
|
||||
logger.warn(
|
||||
s"No common ancestor found in the chain to connect to ${header.hashBE}")
|
||||
val err = TipUpdateResult.BadPreviousBlockHash(header)
|
||||
val failed = ConnectTipResult.BadTip(err)
|
||||
failed
|
||||
|
||||
case Some((prevBlockHeader, prevHeaderIdx)) =>
|
||||
//found a header to connect to!
|
||||
logger.debug(
|
||||
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
|
||||
val chain = blockchain.fromValidHeader(prevBlockHeader)
|
||||
val tipResult =
|
||||
TipValidation.checkNewTip(newPotentialTip = header, chain)
|
||||
|
||||
tipResult match {
|
||||
case success: TipUpdateResult.Success =>
|
||||
logger.debug(
|
||||
s"Successfully verified=${success.header.hashBE.hex}, connecting to chain")
|
||||
val connectionIdx = blockchain.length - prevHeaderIdx
|
||||
|
||||
val oldChain =
|
||||
blockchain.takeRight(connectionIdx)
|
||||
val newChain =
|
||||
Blockchain.fromHeaders(success.headerDb +: oldChain)
|
||||
|
||||
if (connectionIdx != blockchain.length) {
|
||||
//means we have a reorg, since we aren't connecting to latest tip
|
||||
ConnectTipResult.Reorg(success, newChain)
|
||||
} else {
|
||||
//we just extended the latest tip
|
||||
ConnectTipResult.ExtendChain(success, newChain)
|
||||
}
|
||||
case fail: TipUpdateResult.Failure =>
|
||||
logger.warn(
|
||||
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
|
||||
ConnectTipResult.BadTip(fail)
|
||||
}
|
||||
}
|
||||
}
|
||||
tipResult
|
||||
}
|
||||
|
||||
/** Takes in a vector of blockchain updates being executed asynchronously, we can only connect one [[BlockHeader header]]
|
||||
* to a tip successfully, which means _all_ other [[BlockchainUpdate updates]] must fail. This is a helper method
|
||||
* to find the one [[BlockchainUpdate.Successful successful]] update, or else returns one of the [[BlockchainUpdate.Failed failures]]
|
||||
* @return
|
||||
*/
|
||||
private def parseSuccessOrFailure(
|
||||
updates: Vector[BlockchainUpdate]): BlockchainUpdate = {
|
||||
require(updates.nonEmpty,
|
||||
s"Cannot parse success or failure if we don't have any updates!")
|
||||
val successfulTipOpt: Option[BlockchainUpdate] = {
|
||||
updates.find {
|
||||
case update: BlockchainUpdate =>
|
||||
update.isInstanceOf[BlockchainUpdate.Successful]
|
||||
/** Iterates through each given blockchains attempting to connect the given headers to that chain
|
||||
* @return The final updates for each chain
|
||||
*
|
||||
* */
|
||||
def connectHeadersToChains(
|
||||
headers: Vector[BlockHeader],
|
||||
blockchains: Vector[Blockchain])(
|
||||
implicit chainAppConfig: ChainAppConfig): Vector[BlockchainUpdate] = {
|
||||
logger.debug(
|
||||
s"Attempting to connect ${headers.length} headers to ${blockchains.length} blockchains")
|
||||
|
||||
@tailrec
|
||||
def loop(
|
||||
headersToProcess: Vector[BlockHeader],
|
||||
lastUpdates: Vector[BlockchainUpdate]): Vector[BlockchainUpdate] = {
|
||||
headersToProcess match {
|
||||
case h +: t =>
|
||||
val newUpdates: Vector[BlockchainUpdate] = lastUpdates
|
||||
.flatMap { lastUpdate =>
|
||||
val connectTipResult =
|
||||
Blockchain.connectTip(header = h,
|
||||
blockchain = lastUpdate.blockchain)
|
||||
parseConnectTipResult(connectTipResult, lastUpdate)
|
||||
}
|
||||
|
||||
loop(headersToProcess = t, lastUpdates = newUpdates)
|
||||
case Vector() =>
|
||||
lastUpdates
|
||||
}
|
||||
}
|
||||
|
||||
successfulTipOpt match {
|
||||
case Some(update) => update
|
||||
case None =>
|
||||
//if we didn't successfully connect a tip, just take the first failure we see
|
||||
updates.find {
|
||||
case update: BlockchainUpdate =>
|
||||
update.isInstanceOf[BlockchainUpdate.Failed]
|
||||
}.get
|
||||
val initUpdates = blockchains.map { blockchain =>
|
||||
BlockchainUpdate.Successful(blockchain, Vector.empty)
|
||||
}
|
||||
|
||||
loop(headers, initUpdates)
|
||||
}
|
||||
|
||||
/** Parses a connect tip result, and depending on the result it
|
||||
* 1. Extends the current chain by one block
|
||||
* 2. Causes a re-org, which returns the old best tip and the new competing chain
|
||||
* 3. Fails to connect tip, in which case it returns the old best chain
|
||||
* */
|
||||
private def parseConnectTipResult(
|
||||
connectTipResult: ConnectTipResult,
|
||||
lastUpdate: BlockchainUpdate): Vector[BlockchainUpdate] = {
|
||||
lastUpdate match {
|
||||
case _: BlockchainUpdate.Successful =>
|
||||
connectTipResult match {
|
||||
case ConnectTipResult.ExtendChain(tipUpdateResult, newChain) =>
|
||||
val update = BlockchainUpdate.Successful(
|
||||
newChain,
|
||||
tipUpdateResult.headerDb +: lastUpdate.successfulHeaders)
|
||||
Vector(update)
|
||||
|
||||
case ConnectTipResult.Reorg(tipUpdateResult, newChain) =>
|
||||
val competingUpdate = BlockchainUpdate.Successful(
|
||||
newChain,
|
||||
tipUpdateResult.headerDb +: lastUpdate.successfulHeaders)
|
||||
Vector(lastUpdate, competingUpdate)
|
||||
case ConnectTipResult.BadTip(tipUpdateResult) =>
|
||||
val failedUpdate = BlockchainUpdate.Failed(
|
||||
lastUpdate.blockchain,
|
||||
lastUpdate.successfulHeaders,
|
||||
tipUpdateResult.header,
|
||||
tipUpdateResult)
|
||||
Vector(failedUpdate)
|
||||
|
||||
}
|
||||
|
||||
case f: BlockchainUpdate.Failed => Vector(f)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.chain.blockchain.BlockchainUpdate.{Failed, Successful}
|
||||
import org.bitcoins.chain.models.BlockHeaderDb
|
||||
import org.bitcoins.chain.validation.TipUpdateResult
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
|
||||
/** Represents the state of an update to our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
|
||||
* An example of a successful update is receiving a [[org.bitcoins.core.protocol.blockchain.BlockHeader BlockHeader]] and successfully
|
||||
* adding it to our database.
|
||||
/** Represens the state of a batch of [[org.bitcoins.core.protocol.blockchain.BlockHeader BlockHeaders]] being added to our blockchain
|
||||
*
|
||||
* An example of a [[org.bitcoins.chain.blockchain.BlockchainUpdate.Failed Failed]] update
|
||||
* is when we receive a [[org.bitcoins.core.protocol.blockchain.BlockHeader BlockHeader]] that is invalid and because of a
|
||||
|
@ -14,22 +13,62 @@ import org.bitcoins.core.protocol.blockchain.BlockHeader
|
|||
* because of [[org.bitcoins.chain.validation.TipUpdateResult.BadPOW BadPOW]] or a
|
||||
* [[org.bitcoins.chain.validation.TipUpdateResult.BadNonce BadNonce]] etc
|
||||
*/
|
||||
sealed abstract class BlockchainUpdate
|
||||
sealed abstract class BlockchainUpdate {
|
||||
|
||||
/** The successful headers in this batch blockchain update that need to be persisted */
|
||||
def successfulHeaders: Vector[BlockHeaderDb]
|
||||
|
||||
/** Our current blockchain */
|
||||
def blockchain: Blockchain
|
||||
|
||||
def withSuccessfulHeaders(headers: Vector[BlockHeaderDb]): BlockchainUpdate =
|
||||
this match {
|
||||
case s: Successful =>
|
||||
s.copy(successfulHeaders = headers)
|
||||
case f: Failed =>
|
||||
f.copy(successfulHeaders = headers)
|
||||
}
|
||||
}
|
||||
|
||||
object BlockchainUpdate {
|
||||
|
||||
/** The key thing we receive here is [[org.bitcoins.chain.models.BlockHeaderDb BlockHeaderDb]]
|
||||
* with a height assigned to it this happens after
|
||||
* calling [[org.bitcoins.chain.blockchain.ChainHandler.processHeader ChainHandler.processHeader]]
|
||||
* calling [[org.bitcoins.chain.blockchain.ChainHandler.processHeaders ChainHandler.processHeaders]]
|
||||
*/
|
||||
case class Successful(blockchain: Blockchain, updatedHeader: BlockHeaderDb)
|
||||
case class Successful(
|
||||
blockchain: Blockchain,
|
||||
successfulHeaders: Vector[BlockHeaderDb])
|
||||
extends BlockchainUpdate {
|
||||
def height: Long = updatedHeader.height
|
||||
if (successfulHeaders.nonEmpty) {
|
||||
require(
|
||||
blockchain.tip == successfulHeaders.head,
|
||||
s"Tip did not equal last successful header, tip=${blockchain.tip.hashBE} lastSuccessfulHeader=${successfulHeaders.head.hashBE}"
|
||||
)
|
||||
}
|
||||
def height: Long = blockchain.height
|
||||
}
|
||||
|
||||
/**
|
||||
* Means we failed to update the given blockchain with _ALL_ given headers
|
||||
* This means we could have had a partially successful update, with the headers/blockchain
|
||||
* returned in this case class
|
||||
*/
|
||||
case class Failed(
|
||||
blockchain: Blockchain,
|
||||
successfulHeaders: Vector[BlockHeaderDb],
|
||||
failedHeader: BlockHeader,
|
||||
tipUpdateFailure: TipUpdateResult.Failure)
|
||||
extends BlockchainUpdate
|
||||
extends BlockchainUpdate {
|
||||
require(
|
||||
!blockchain.contains(failedHeader),
|
||||
s"Our blockchain should not contain the failed header=${failedHeader}")
|
||||
|
||||
if (successfulHeaders.nonEmpty) {
|
||||
require(
|
||||
successfulHeaders.head == blockchain.tip,
|
||||
s"Our blockchain.tip should be the first successful header, got=${blockchain.tip} expected=${successfulHeaders.head}"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,11 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.chain.ChainVerificationLogger
|
||||
import org.bitcoins.chain.api.ChainApi
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.chain.models.{BlockHeaderDAO, BlockHeaderDb}
|
||||
import org.bitcoins.chain.validation.TipUpdateResult
|
||||
import org.bitcoins.chain.validation.TipUpdateResult.{
|
||||
BadNonce,
|
||||
BadPOW,
|
||||
BadPreviousBlockHash
|
||||
}
|
||||
import org.bitcoins.core.crypto.DoubleSha256DigestBE
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.chain.ChainVerificationLogger
|
||||
|
||||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
|
@ -48,81 +41,27 @@ case class ChainHandler(
|
|||
}
|
||||
}
|
||||
|
||||
override def processHeader(header: BlockHeader)(
|
||||
implicit ec: ExecutionContext): Future[ChainHandler] = {
|
||||
logger.debug(
|
||||
s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}")
|
||||
|
||||
val blockchainUpdate =
|
||||
Blockchain.connectTip(header = header, blockchains = blockchains)
|
||||
|
||||
val newHandlerF = blockchainUpdate match {
|
||||
case BlockchainUpdate.Successful(newChain, updatedHeader) =>
|
||||
//now we have successfully connected the header, we need to insert
|
||||
//it into the database
|
||||
val createdF = blockHeaderDAO.create(updatedHeader)
|
||||
createdF.map { header =>
|
||||
logger.debug(
|
||||
s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}")
|
||||
val chainIdxOpt = blockchains.zipWithIndex.find {
|
||||
case (chain, _) =>
|
||||
val oldTip = newChain(1) //should be safe, even with genesis header as we just connected a tip
|
||||
oldTip == chain.tip
|
||||
}
|
||||
|
||||
val updatedChains = {
|
||||
chainIdxOpt match {
|
||||
case Some((_, idx)) =>
|
||||
logger.trace(
|
||||
s"Updating chain at idx=${idx} out of competing chains=${blockchains.length} with new tip=${header.hashBE.hex}")
|
||||
blockchains.updated(idx, newChain)
|
||||
|
||||
case None =>
|
||||
logger.info(
|
||||
s"New competing blockchain with tip=${newChain.tip}")
|
||||
blockchains.:+(newChain)
|
||||
}
|
||||
}
|
||||
|
||||
ChainHandler(blockHeaderDAO, updatedChains)
|
||||
}
|
||||
case BlockchainUpdate.Failed(_, _, reason) =>
|
||||
val errMsg =
|
||||
s"Failed to add header to chain, header=${header.hashBE.hex} reason=${reason}"
|
||||
logger.warn(errMsg)
|
||||
// potential chain split happening, let's log what's going on
|
||||
logTipConnectionFailure(reason).flatMap { _ =>
|
||||
Future.failed(new RuntimeException(errMsg))
|
||||
}
|
||||
/** @inheritdoc */
|
||||
override def processHeaders(headers: Vector[BlockHeader])(
|
||||
implicit ec: ExecutionContext): Future[ChainApi] = {
|
||||
val blockchainUpdates: Vector[BlockchainUpdate] = {
|
||||
Blockchain.connectHeadersToChains(headers, blockchains)
|
||||
}
|
||||
|
||||
newHandlerF
|
||||
}
|
||||
|
||||
/** Logs a tip connection failure by querying local chain state
|
||||
* and comparing it to the received `TipUpdateResult`
|
||||
*/
|
||||
private def logTipConnectionFailure(failure: TipUpdateResult.Failure)(
|
||||
implicit ec: ExecutionContext): Future[Unit] = {
|
||||
failure match {
|
||||
case _ @(_: BadPOW | _: BadNonce) =>
|
||||
// TODO: Log this in a meaningful way
|
||||
FutureUtil.unit
|
||||
case _: BadPreviousBlockHash =>
|
||||
blockHeaderDAO.chainTips.map { tips =>
|
||||
if (tips.length > 1) {
|
||||
logger.warn {
|
||||
s"We have multiple (${tips.length}) , competing chainTips=${tips
|
||||
.map(_.hashBE.hex)
|
||||
.mkString("[", ",", "]")}"
|
||||
}
|
||||
} else {
|
||||
logger.warn(
|
||||
s"We don't have competing chainTips. Most recent, valid header=${tips.head.hashBE.hex}")
|
||||
}
|
||||
}
|
||||
val headersToBeCreated = {
|
||||
blockchainUpdates.flatMap(_.successfulHeaders).distinct
|
||||
}
|
||||
|
||||
val chains = blockchainUpdates.map(_.blockchain)
|
||||
|
||||
val createdF = blockHeaderDAO.createAll(headersToBeCreated)
|
||||
|
||||
val newChainHandler =
|
||||
ChainHandler(blockHeaderDAO = blockHeaderDAO, blockchains = chains)
|
||||
|
||||
createdF.map { _ =>
|
||||
newChainHandler
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package org.bitcoins.chain.blockchain
|
||||
|
||||
import org.bitcoins.chain.models.BlockHeaderDb
|
||||
import org.bitcoins.chain.validation.TipUpdateResult
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
|
||||
/** The result indicating how the [[org.bitcoins.chain.validation.TipUpdateResult TipUpdateResult]]
|
||||
* modified the chain.
|
||||
*
|
||||
* We can
|
||||
*
|
||||
* 1. Extend the chain
|
||||
* 2. Reorg the chain
|
||||
* 3. Fail to connect to anything in the chain
|
||||
*
|
||||
* */
|
||||
sealed trait ConnectTipResult {
|
||||
def tipUpdateResult: TipUpdateResult
|
||||
|
||||
lazy val header: BlockHeader = tipUpdateResult.header
|
||||
|
||||
}
|
||||
|
||||
object ConnectTipResult {
|
||||
|
||||
/** Indicates we sucuessfully extended our chain by one block */
|
||||
case class ExtendChain(
|
||||
tipUpdateResult: TipUpdateResult.Success,
|
||||
newChain: Blockchain)
|
||||
extends ConnectTipResult {
|
||||
require(
|
||||
headerDb == newChain.tip,
|
||||
s"Cannot extend chain without having tipUpdate be our best tip, tipUpdateResult=${tipUpdateResult.header} chain.tip=${newChain.tip}"
|
||||
)
|
||||
lazy val headerDb: BlockHeaderDb = tipUpdateResult.headerDb
|
||||
}
|
||||
|
||||
/**
|
||||
* Means we had a reorg happen, aka the header was connected to
|
||||
* something that was _not_ our previous best tip
|
||||
* @param tipUpdateResult the successful connection
|
||||
* @param newChain the new chain where the best tip is the header we passed in
|
||||
*/
|
||||
case class Reorg(
|
||||
tipUpdateResult: TipUpdateResult.Success,
|
||||
newChain: Blockchain)
|
||||
extends ConnectTipResult {
|
||||
require(
|
||||
headerDb == newChain.tip,
|
||||
s"Cannot reorg without having tipUpdate be our best tip, tipUpdateResult=${tipUpdateResult.header} chain.tip=${newChain.tip}")
|
||||
|
||||
lazy val headerDb: BlockHeaderDb = tipUpdateResult.headerDb
|
||||
}
|
||||
|
||||
/** Means we could not connect the header to anything in the given blockchain */
|
||||
case class BadTip(tipUpdateResult: TipUpdateResult.Failure)
|
||||
extends ConnectTipResult
|
||||
|
||||
}
|
|
@ -6,26 +6,29 @@ import org.bitcoins.core.protocol.blockchain.BlockHeader
|
|||
/** Represents the result of updating the chain with
|
||||
* the given header
|
||||
*/
|
||||
sealed abstract class TipUpdateResult
|
||||
sealed abstract class TipUpdateResult {
|
||||
def header: BlockHeader
|
||||
}
|
||||
|
||||
object TipUpdateResult {
|
||||
|
||||
/** Indicates we successfully update the chain tip with this header */
|
||||
case class Success(header: BlockHeaderDb) extends TipUpdateResult
|
||||
|
||||
sealed abstract class Failure extends TipUpdateResult {
|
||||
def header: BlockHeader
|
||||
case class Success(headerDb: BlockHeaderDb) extends TipUpdateResult {
|
||||
override def header = headerDb.blockHeader
|
||||
}
|
||||
|
||||
sealed abstract class Failure extends TipUpdateResult
|
||||
|
||||
/** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.previousBlockHashBE previousBlockHashBE]] was incorrect */
|
||||
case class BadPreviousBlockHash(header: BlockHeader) extends Failure {
|
||||
case class BadPreviousBlockHash(override val header: BlockHeader)
|
||||
extends Failure {
|
||||
override def toString: String =
|
||||
s"BadPreviousBlockHash(hash=${header.hashBE}, previous=${header.previousBlockHashBE})"
|
||||
}
|
||||
|
||||
/** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nBits nBits]] was invalid */
|
||||
case class BadPOW(header: BlockHeader) extends Failure
|
||||
case class BadPOW(override val header: BlockHeader) extends Failure
|
||||
|
||||
/** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nonce nonce]] was invalid */
|
||||
case class BadNonce(header: BlockHeader) extends Failure
|
||||
case class BadNonce(override val header: BlockHeader) extends Failure
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue