Rework NodeTestUtil to use a specific bestBlockHash (#5332)

* Rework NodeTestUtil to use a specific bestBlockHash, this is useful in reorg situations

Use param in reorg test, modify scaladoc

WIP

* Cleanup

* Try to fix usage of stopHashBE

* Cleanup

* WIP: Fix getCompactFilterStartHeight()

* Revert logback-test.xml

* Fix bug with isFiltersSynced() in reorg situations

* scalafmt, fix compile

* Fix another bug with isFiltersSynced()

* Fix compile
This commit is contained in:
Chris Stewart 2023-12-31 17:38:12 -06:00 committed by GitHub
parent 5e81ec5ed2
commit b39736fb8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 47 deletions

View File

@ -1181,6 +1181,8 @@ case class CompactFilterHeadersMessage(
filterHashes: Vector[DoubleSha256Digest])
extends DataPayload {
val stopHashBE: DoubleSha256DigestBE = stopHash.flip
/** The number of hashes in this message */
val filterHashesLength: CompactSizeUInt = CompactSizeUInt(
UInt64(filterHashes.length))

View File

@ -389,20 +389,26 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
val bitcoind0 = bitcoinds(0)
val bitcoind1 = bitcoinds(1)
for {
_ <- NodeTestUtil.awaitAllSync(node, bitcoind0)
_ <- NodeTestUtil.awaitSyncAndIBD(node, bitcoind0)
//disconnect bitcoind1 as we don't need it
nodeUri1 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1)
_ <- bitcoind1.disconnectNode(nodeUri1)
bestBlockHash0 <- bitcoind0.getBestBlockHash()
//invalidate blockhash to force a reorg when next block is generated
_ <- bitcoind0.invalidateBlock(bestBlockHash0)
_ <- AsyncUtil.retryUntilSatisfiedF(
() => node.getConnectionCount.map(_ == 1),
1.second)
//now generate a block, make sure we sync with them
hashes <- bitcoind0.generate(1)
hashes0 <- bitcoind0.generate(1)
chainApi <- node.chainApiFromDb()
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
chainApi.getHeader(hashes.head).map(_.isDefined))
chainApi.getHeader(hashes0.head).map(_.isDefined))
//generate another block to make sure the reorg is complete
_ <- bitcoind0.generate(1)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind0)
hashes1 <- bitcoind0.generate(1)
_ <- NodeTestUtil.awaitAllSync(node = node,
bitcoind = bitcoind0,
bestBlockHashBE = Some(hashes1.head))
} yield succeed
}
}

View File

@ -74,7 +74,7 @@ case class PeerManager(
private def syncCompactFilters(
bestFilterHeader: CompactFilterHeaderDb,
chainApi: ChainApi,
compactFilterStartHeight: Int,
compactFilterStartHeightOpt: Option[Int],
nodeState: SyncNodeState)(implicit
chainAppConfig: ChainAppConfig): Future[Unit] = {
val syncPeer = nodeState.syncPeer
@ -97,15 +97,13 @@ case class PeerManager(
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
!isSyncFilterHeaders && compactFilterStartHeight < bestFilterHeader.height
) {
if (!isSyncFilterHeaders) {
PeerManager
.sendNextGetCompactFilterCommand(
peerMessageSenderApi = peerMsgSender,
chainApi = chainApi,
filterBatchSize = chainAppConfig.filterBatchSize,
startHeightOpt = Some(compactFilterStartHeight),
startHeightOpt = compactFilterStartHeightOpt,
stopBlockHash = bestFilterHeader.blockHashBE,
peer = syncPeer
)
@ -927,19 +925,19 @@ case class PeerManager(
} else {
syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
compactFilterStartHeight = bestFilter.height,
compactFilterStartHeightOpt = None,
nodeState = nodeState)
}
case (Some(bestFilterHeader), None) =>
val compactFilterStartHeightF =
val compactFilterStartHeightOptF =
PeerManager.getCompactFilterStartHeight(chainApi,
walletCreationTimeOpt)
for {
compactFilterStartHeight <- compactFilterStartHeightF
compactFilterStartHeightOpt <- compactFilterStartHeightOptF
_ <- syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
compactFilterStartHeight =
compactFilterStartHeight,
compactFilterStartHeightOpt =
compactFilterStartHeightOpt,
nodeState = nodeState)
} yield ()
@ -1112,11 +1110,10 @@ object PeerManager extends Logging {
chainApi.nextFilterHeaderBatchRange(stopBlockHash = stopBlockHash,
batchSize = filterBatchSize,
startHeightOpt = startHeightOpt)
_ = logger.info(
s"Requesting compact filters from $filterSyncMarkerOpt with peer=$peer startHeightOpt=$startHeightOpt stopBlockHashBE=$stopBlockHash")
res <- filterSyncMarkerOpt match {
case Some(filterSyncMarker) =>
logger.info(
s"Requesting compact filters from $filterSyncMarker with peer=$peer")
peerMessageSenderApi
.sendGetCompactFiltersMessage(filterSyncMarker)
.map(_ => true)
@ -1167,11 +1164,11 @@ object PeerManager extends Logging {
def getCompactFilterStartHeight(
chainApi: ChainApi,
walletCreationTimeOpt: Option[Instant])(implicit
ec: ExecutionContext): Future[Int] = {
ec: ExecutionContext): Future[Option[Int]] = {
chainApi.getBestFilter().flatMap {
case Some(f) =>
case Some(_) =>
//we have already started syncing filters, return the height of the last filter seen
Future.successful(f.height + 1)
Future.successful(None)
case None =>
walletCreationTimeOpt match {
case Some(instant) =>
@ -1189,10 +1186,11 @@ object PeerManager extends Logging {
//want to choose the maximum out of these too
//if our internal chainstate filter count is > creationTimeHeight
//we just want to start syncing from our last seen filter
Math.max(height, filterCount)
val result = Math.max(height, filterCount)
Some(result)
}
case None =>
Future.successful(0)
Future.successful(None)
}
}

View File

@ -182,9 +182,10 @@ case class DataMessageHandler(
for {
sortedBlockFilters <- sortedBlockFiltersF
sortedFilterMessages = sortedBlockFilters.map(_._2)
filterBestBlockHashBE = sortedFilterMessages.lastOption
.map(_.blockHashBE)
_ = logger.debug(
s"Processing ${filterBatch.size} filters bestBlockHashBE=${sortedFilterMessages.lastOption
.map(_.blockHashBE)}")
s"Processing ${filterBatch.size} filters bestBlockHashBE=${filterBestBlockHashBE}")
newChainApi <- chainApi.processFilters(sortedFilterMessages)
sortedGolombFilters = sortedBlockFilters.map(x =>
(x._1, x._3))
@ -524,7 +525,7 @@ case class DataMessageHandler(
private def sendFirstGetCompactFilterCommand(
peerMessageSenderApi: PeerMessageSenderApi,
stopBlockHash: DoubleSha256DigestBE,
startHeight: Int,
startHeightOpt: Option[Int],
syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = {
logger.info(s"Beginning to sync filters to stopBlockHashBE=$stopBlockHash")
@ -538,7 +539,7 @@ case class DataMessageHandler(
}
sendNextGetCompactFilterCommand(peerMessageSenderApi = peerMessageSenderApi,
startHeightOpt = Some(startHeight),
startHeightOpt = startHeightOpt,
stopBlockHash = stopBlockHash,
fs = fs)
}
@ -756,7 +757,9 @@ case class DataMessageHandler(
val recoveredStateF: Future[NodeState] = getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(s"Received duplicate headers from ${peer} in state=$state")
Future.successful(headerSyncState)
val d = DoneSyncing(headerSyncState.peersWithServices,
headerSyncState.waitingForDisconnection)
Future.successful(d)
case _: InvalidBlockHeader =>
logger.warn(
s"Invalid headers of count $count sent from ${peer} in state=$state")
@ -792,8 +795,7 @@ case class DataMessageHandler(
val blockCountF = chainApi.getBlockCount()
val bestBlockHashF = chainApi.getBestBlockHash()
for {
_ <- chainApi.processFilterHeaders(filterHeaders,
filterHeader.stopHash.flip)
_ <- chainApi.processFilterHeaders(filterHeaders, filterHeader.stopHashBE)
filterHeaderCount <- chainApi.getFilterHeaderCount()
blockCount <- blockCountF
bestBlockHash <- bestBlockHashF
@ -804,18 +806,17 @@ case class DataMessageHandler(
sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
syncPeer = peer,
prevStopHash = filterHeader.stopHash.flip,
prevStopHash = filterHeader.stopHashBE,
stopHash = bestBlockHash).map(_ => filterHeaderSync)
} else {
for {
startHeight <- PeerManager.getCompactFilterStartHeight(
startHeightOpt <- PeerManager.getCompactFilterStartHeight(
chainApi,
walletCreationTimeOpt)
bestBlockHash <- bestBlockHashF
filterSyncStateOpt <- sendFirstGetCompactFilterCommand(
peerMessageSenderApi = peerMessageSenderApi,
stopBlockHash = bestBlockHash,
startHeight = startHeight,
stopBlockHash = filterHeader.stopHashBE,
startHeightOpt = startHeightOpt,
syncNodeState = filterHeaderSync)
} yield {
filterSyncStateOpt match {

View File

@ -79,6 +79,29 @@ abstract class NodeTestUtil extends P2PLogger {
}
}
private def isSameBestFilter(
node: NeutrinoNode,
rpc: BitcoindRpcClient,
bestBlockHashBEOpt: Option[DoubleSha256DigestBE])(implicit
ec: ExecutionContext): Future[Boolean] = {
val bestBlockHashBEF = bestBlockHashBEOpt match {
case Some(bestBlockHash) => Future.successful(bestBlockHash)
case None => rpc.getBestBlockHash()
}
for {
bestBlockHashBE <- bestBlockHashBEF
chainApi <- node.chainApiFromDb()
bestFilterOpt <- chainApi.getBestFilter()
} yield {
bestFilterOpt match {
case Some(bestFilter) => bestFilter.blockHashBE == bestBlockHashBE
case None => false
}
}
}
def isSameBestFilterHeight(node: NeutrinoNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val rpcCountF = rpc.getBlockCount()
@ -137,24 +160,35 @@ abstract class NodeTestUtil extends P2PLogger {
}
/** Awaits sync between the given node and bitcoind client */
def awaitCompactFiltersSync(node: NeutrinoNode, rpc: BitcoindRpcClient)(
implicit sys: ActorSystem): Future[Unit] = {
def awaitCompactFiltersSync(
node: NeutrinoNode,
rpc: BitcoindRpcClient,
bestBlockHashBEOpt: Option[DoubleSha256DigestBE] = None)(implicit
sys: ActorSystem): Future[Unit] = {
import sys.dispatcher
TestAsyncUtil
.retryUntilSatisfiedF(() => isSameBestFilterHeight(node, rpc),
1.second,
maxTries = syncTries)
.retryUntilSatisfiedF(
() => isSameBestFilter(node, rpc, bestBlockHashBEOpt),
1.second,
maxTries = syncTries)
}
/** The future doesn't complete until the nodes best hash is the given hash */
def awaitBestHash(node: Node, bitcoind: BitcoindRpcClient)(implicit
def awaitBestHash(
node: Node,
bitcoind: BitcoindRpcClient,
bestHashOpt: Option[DoubleSha256DigestBE] = None)(implicit
system: ActorSystem): Future[Unit] = {
import system.dispatcher
def bestBitcoinSHashF: Future[DoubleSha256DigestBE] = {
node.chainApiFromDb().flatMap(_.getBestBlockHash())
}
def bestBitcoindHashF: Future[DoubleSha256DigestBE] =
bitcoind.getBestBlockHash()
def bestBitcoindHashF: Future[DoubleSha256DigestBE] = {
bestHashOpt match {
case Some(bestHash) => Future.successful(bestHash)
case None => bitcoind.getBestBlockHash()
}
}
TestAsyncUtil.retryUntilSatisfiedF(
() => {
@ -168,14 +202,22 @@ abstract class NodeTestUtil extends P2PLogger {
)
}
/** Awaits header, filter header and filter sync between the neutrino node and rpc client */
def awaitAllSync(node: NeutrinoNode, bitcoind: BitcoindRpcClient)(implicit
/** Awaits header, filter header and filter sync between the neutrino node and rpc client
* @param the node we are syncing
* @param bitcoind the node we are syncing against
* @param bestBlockHashBE the best block hash we are expected to sync to, this is useful for reorg situations.
* If None given, we use bitcoind's best block header
*/
def awaitAllSync(
node: NeutrinoNode,
bitcoind: BitcoindRpcClient,
bestBlockHashBE: Option[DoubleSha256DigestBE] = None)(implicit
system: ActorSystem): Future[Unit] = {
import system.dispatcher
for {
_ <- NodeTestUtil.awaitBestHash(node, bitcoind)
_ <- NodeTestUtil.awaitBestHash(node, bitcoind, bestBlockHashBE)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind, bestBlockHashBE)
} yield ()
}