Send GetHeadersMessage using all of our cached headers to prevent reorgs from stalling node (#1758)

* Send GetHeadersMessage using all of our cached headers to prevent reorgs from stalling node

* Add test, improve logging

* Start with correct chains

* Simplify error, change back to createAll
This commit is contained in:
Ben Carman 2020-08-10 13:04:12 -05:00 committed by GitHub
parent 305841f232
commit c3dc52ce90
7 changed files with 71 additions and 19 deletions

View File

@ -245,6 +245,8 @@ class ChainHandlerTest extends ChainDbUnitTest {
}
}
// B
// C -> D
it must "handle a very basic reorg where one chain is one block behind the best chain" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
@ -278,13 +280,41 @@ class ChainHandlerTest extends ChainDbUnitTest {
for {
chainHandler <- chainHandlerDF
newHeaderD <- newHeaderDF
hash <- chainHandler.getBestBlockHash
hash <- chainHandler.getBestBlockHash()
} yield {
// assert that header D overtook header B
assert(hash == newHeaderD.hashBE)
}
}
// G -> A -> B
// G -> C -> D -> E
it must "handle a reorg where one chain is two blocks behind the best chain" in {
chainHandler: ChainHandler =>
for {
genesis <- chainHandler.getBestBlockHeader()
oldFirst = BlockHeaderHelper.buildNextHeader(genesis)
oldSecond = BlockHeaderHelper.buildNextHeader(oldFirst)
startChain = Vector(oldFirst, oldSecond)
toBeReorged <-
chainHandler.processHeaders(startChain.map(_.blockHeader))
oldTip <- toBeReorged.getBestBlockHeader()
_ = assert(oldTip.hashBE == oldSecond.hashBE)
newFirst = BlockHeaderHelper.buildNextHeader(genesis)
newSecond = BlockHeaderHelper.buildNextHeader(newFirst)
third = BlockHeaderHelper.buildNextHeader(newSecond)
newChain = Vector(newFirst, newSecond, third)
reorged <- chainHandler.processHeaders(newChain.map(_.blockHeader))
newTip <- reorged.getBestBlockHeader()
} yield {
assert(newTip.hashBE == third.hashBE)
}
}
it must "NOT reorg to a shorter chain that just received a new block" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)

View File

@ -39,7 +39,8 @@ private[blockchain] trait BaseBlockChain extends SeqWrapper[BlockHeaderDb] {
val tip: BlockHeaderDb = headers.head
require(headers.size <= 1 || headers(1).height == tip.height - 1)
require(headers.size <= 1 || headers(1).height == tip.height - 1,
s"Headers must be in descending order, got ${headers.take(5)}")
/** The height of the chain */
val height: Int = tip.height
@ -118,7 +119,7 @@ private[blockchain] trait BaseBlockChainCompObject
//found a header to connect to!
val prevBlockHeader = blockchain.headers(prevHeaderIdx)
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain of ${blockchain.length} headers")
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain of ${blockchain.length} headers with tip ${blockchain.tip.hashBE.hex}")
val chain = blockchain.fromValidHeader(prevBlockHeader)
val tipResult =
TipValidation.checkNewTip(newPotentialTip = header, chain)

View File

@ -104,7 +104,11 @@ case class ChainHandler(
}
val headersToBeCreated = {
blockchainUpdates.flatMap(_.successfulHeaders).distinct
// During reorgs, we can be sent a header twice
blockchainUpdates
.flatMap(_.successfulHeaders)
.distinct
.filterNot(blockchains.flatMap(_.headers).contains)
}
val chains = blockchainUpdates.map(_.blockchain)
@ -234,16 +238,11 @@ case class ChainHandler(
s"Unexpected previous header's height: ${prevHeader.height} != ${filterHeadersToCreate.head.height - 1}"
)
case None =>
if (
firstFilter.previousFilterHeaderBE == DoubleSha256DigestBE.empty && firstFilter.height == 0
) {
//we are ok, according to BIP157 the previous the genesis filter's prev hash should
//be the empty hash
()
} else {
sys.error(
s"Previous filter header does not exist: $firstFilter")
}
// If the previous filter header doesn't exist it must be for the genesis block
require(
firstFilter.previousFilterHeaderBE == DoubleSha256DigestBE.empty && firstFilter.height == 0,
s"Previous filter header does not exist: $firstFilter"
)
}
} else FutureUtil.unit
_ <- filterHeaderDAO.createAll(filterHeadersToCreate)

View File

@ -318,10 +318,10 @@ case class BlockHeaderDAO()(implicit
ec: ExecutionContext): Future[Vector[Blockchain]] = {
val chainTipsF = chainTips
chainTipsF.flatMap { tips =>
val nestedFuture: Vector[Future[Blockchain]] = tips.map { tip =>
getBlockchainFrom(tip)
val nestedFuture: Vector[Future[Vector[Blockchain]]] = tips.map { tip =>
getBlockchainsFrom(tip)
}
Future.sequence(nestedFuture)
Future.sequence(nestedFuture).map(_.flatten)
}
}
@ -335,6 +335,14 @@ case class BlockHeaderDAO()(implicit
Blockchain.fromHeaders(headers.sortBy(_.height)(Ordering.Int.reverse)))
}
def getBlockchainsFrom(header: BlockHeaderDb)(implicit
ec: ExecutionContext): Future[Vector[Blockchain]] = {
val diffInterval = appConfig.chain.difficultyChangeInterval
val height = Math.max(0, header.height - diffInterval)
getBlockchainsBetweenHeights(from = height, to = header.height)
}
@tailrec
private def loop(
chains: Vector[Blockchain],

View File

@ -59,7 +59,10 @@ case class NeutrinoNode(
filterCount <- chainApi.getFilterCount
peerMsgSender <- peerMsgSenderF
} yield {
peerMsgSender.sendGetHeadersMessage(header.hashBE.flip)
// Get all of our cached headers in case of a reorg
val cachedHeaders =
chainApi.blockchains.flatMap(_.headers).map(_.hashBE.flip)
peerMsgSender.sendGetHeadersMessage(cachedHeaders)
// If we have started syncing filters headers
if (filterHeaderCount != 0) {

View File

@ -185,7 +185,10 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
chainApi <- chainApiFromDb()
header <- chainApi.getBestBlockHeader()
} yield {
peerMsgSenderF.map(_.sendGetHeadersMessage(header.hashBE.flip))
// Get all of our cached headers in case of a reorg
val cachedHeaders =
chainApi.blockchains.flatMap(_.headers).map(_.hashBE.flip)
peerMsgSenderF.map(_.sendGetHeadersMessage(cachedHeaders))
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE}")
}

View File

@ -84,6 +84,14 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(headersMsg)
}
def sendGetHeadersMessage(
hashes: Vector[DoubleSha256Digest]): Future[Unit] = {
// GetHeadersMessage has a max of 101 hashes
val headersMsg = GetHeadersMessage(hashes.distinct.take(101))
logger.trace(s"Sending getheaders=$headersMsg to peer=${client.peer}")
sendMsg(headersMsg)
}
def sendHeadersMessage(): Future[Unit] = {
val sendHeadersMsg = SendHeadersMessage
sendMsg(sendHeadersMsg)