Move {syncFromNewPeer(), syncHelper()} into PeerManager, remove reference to Node inside of PeerManager (#5102)

* Move {syncFromNewPeer(), syncHelper()} into PeerManager, remove reference to Node inside of PeerManager

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-06-13 14:18:22 -05:00 committed by GitHub
parent bed670fb6f
commit f4f45a1cad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 111 deletions

View file

@ -2,16 +2,8 @@ package org.bitcoins.node
import akka.actor.ActorSystem
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp
@ -39,7 +31,7 @@ case class NeutrinoNode(
implicit override def chainAppConfig: ChainAppConfig = chainConfig
override lazy val peerManager: PeerManager =
PeerManager(paramPeers, this, walletCreationTimeOpt)
PeerManager(paramPeers, walletCreationTimeOpt)
override def start(): Future[NeutrinoNode] = {
val res = for {
@ -74,97 +66,10 @@ case class NeutrinoNode(
chainApi <- chainApiFromDb()
_ <- chainApi.setSyncing(true)
_ <- peerAvailableF
_ <- syncHelper(None)
_ <- peerManager.syncHelper(None)
} yield ()
}
/** Helper method to sync the blockchain over the network
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
*/
private def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
logger.info(s"Syncing with peerOpt=$syncPeerOpt")
val chainApi: ChainApi = ChainHandler.fromDatabase()
val blockchainsF =
BlockHeaderDAO()(executionContext, chainConfig).getBlockchains()
for {
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
bestFilterOpt <- chainApi.getBestFilter()
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
_ <- {
syncPeerOpt match {
case Some(peer) =>
peerManager.sendGetHeadersMessage(cachedHeaders, Some(peer))
case None => peerManager.gossipGetHeadersMessage(cachedHeaders)
}
}
hasStaleTip <- chainApi.isTipStale()
_ <- {
if (hasStaleTip) {
//if we have a stale tip, we will request to sync filter headers / filters
//after we are done syncing block headers
Future.unit
} else {
syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
chainApi = chainApi)
}
}
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex}")
}
}
private def syncFilters(
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
bestFilterOpt: Option[CompactFilterDb],
bestBlockHeader: BlockHeaderDb,
chainApi: ChainApi): Future[Unit] = {
// If we have started syncing filters headers
(bestFilterHeaderOpt, bestFilterOpt) match {
case (None, None) | (None, Some(_)) =>
//do nothing if we haven't started syncing
Future.unit
case (Some(bestFilterHeader), Some(bestFilter)) =>
val isFilterHeaderSynced =
bestFilterHeader.blockHashBE == bestBlockHeader.hashBE
val isFiltersSynced = {
//check if we have started syncing filters,
//and if so, see if filter headers and filters
//were in sync
bestFilter.hashBE == bestFilterHeader.filterHashBE
}
if (isFilterHeaderSynced && isFiltersSynced) {
//means we are in sync, with filter heads & block headers & filters
//if there _both_ filter headers and block headers are on
//an old tip, our event driven node will start syncing
//filters after block headers are in sync
//do nothing
Future.unit
} else {
peerManager.syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
bestFilterOpt = Some(bestFilter))
}
case (Some(bestFilterHeader), None) =>
peerManager.syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
bestFilterOpt = None)
}
}
override def syncFromNewPeer(): Future[Option[Peer]] = {
for {
syncPeerOpt <- peerManager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
_ <- syncHelper(syncPeerOpt)
} yield syncPeerOpt
}
/** Gets the number of compact filters in the database */
override def getFilterCount(): Future[Int] =
chainApiFromDb().flatMap(_.getFilterCount())

View file

@ -112,11 +112,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
*/
def sync(): Future[Unit]
/** Sync from a new peer
* @return the new peer we are syncing from else none if we could not start syncing with another peer
*/
def syncFromNewPeer(): Future[Option[Peer]]
/** Broadcasts the given transaction over the P2P network */
override def broadcastTransactions(
transactions: Vector[Transaction]): Future[Unit] = {

View file

@ -20,8 +20,13 @@ import grizzled.slf4j.Logging
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.{ChainApi, FilterSyncMarker}
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.chain.db.{
BlockHeaderDb,
CompactFilterDb,
CompactFilterHeaderDb
}
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.transaction.Transaction
@ -36,7 +41,7 @@ import org.bitcoins.node.util.{BitcoinSNodeUtil, PeerMessageSenderApi}
import scodec.bits.ByteVector
import java.net.InetAddress
import java.time.{Instant}
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
@ -45,7 +50,6 @@ import scala.util.Random
case class PeerManager(
paramPeers: Vector[Peer] = Vector.empty,
node: NeutrinoNode,
walletCreationTimeOpt: Option[Instant])(implicit
ec: ExecutionContext,
system: ActorSystem,
@ -557,9 +561,7 @@ case class PeerManager(
val shouldReconnect =
(forceReconnect || connectedPeerCount == 0) && isStarted.get
if (peers.exists(_ != peer) && syncPeerOpt.isDefined) {
node
.syncFromNewPeer()
.map(_ => ())
syncFromNewPeer().map(_ => ())
} else if (syncPeerOpt.isDefined) {
if (shouldReconnect) {
finder.reconnect(peer)
@ -635,7 +637,7 @@ case class PeerManager(
sys.error(s"Cannot have state=$s and have a query timeout")
}
if (peer == syncPeer)
node.syncFromNewPeer().map(_ => ())
syncFromNewPeer().map(_ => ())
else Future.unit
}
}
@ -646,7 +648,7 @@ case class PeerManager(
logger.info(s"Header request timed out from $peer in state $state")
state match {
case HeaderSync(_) | MisbehavingPeer(_) =>
node.syncFromNewPeer().map(_ => getDataMessageHandler)
syncFromNewPeer().map(_ => getDataMessageHandler)
case headerState @ ValidatingHeaders(_, _, failedCheck, _) =>
val newHeaderState = headerState.copy(failedCheck = failedCheck + peer)
@ -748,7 +750,7 @@ case class PeerManager(
//disconnect the misbehaving peer
for {
_ <- removePeer(m.badPeer)
_ <- node.syncFromNewPeer()
_ <- syncFromNewPeer()
} yield msg
case removePeers: RemovePeers =>
updateDataMessageHandler(newDmh)
@ -881,6 +883,93 @@ case class PeerManager(
this
}
/** Helper method to sync the blockchain over the network
*
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
*/
def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
logger.info(s"Syncing with peerOpt=$syncPeerOpt")
val chainApi: ChainApi = ChainHandler.fromDatabase()
val blockchainsF =
BlockHeaderDAO()(ec, chainAppConfig).getBlockchains()
for {
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
bestFilterOpt <- chainApi.getBestFilter()
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
_ <- {
syncPeerOpt match {
case Some(peer) =>
sendGetHeadersMessage(cachedHeaders, Some(peer))
case None => gossipGetHeadersMessage(cachedHeaders)
}
}
hasStaleTip <- chainApi.isTipStale()
_ <- {
if (hasStaleTip) {
//if we have a stale tip, we will request to sync filter headers / filters
//after we are done syncing block headers
Future.unit
} else {
syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
chainApi = chainApi)
}
}
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex}")
}
}
private def syncFilters(
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
bestFilterOpt: Option[CompactFilterDb],
bestBlockHeader: BlockHeaderDb,
chainApi: ChainApi): Future[Unit] = {
// If we have started syncing filters headers
(bestFilterHeaderOpt, bestFilterOpt) match {
case (None, None) | (None, Some(_)) =>
//do nothing if we haven't started syncing
Future.unit
case (Some(bestFilterHeader), Some(bestFilter)) =>
val isFilterHeaderSynced =
bestFilterHeader.blockHashBE == bestBlockHeader.hashBE
val isFiltersSynced = {
//check if we have started syncing filters,
//and if so, see if filter headers and filters
//were in sync
bestFilter.hashBE == bestFilterHeader.filterHashBE
}
if (isFilterHeaderSynced && isFiltersSynced) {
//means we are in sync, with filter heads & block headers & filters
//if there _both_ filter headers and block headers are on
//an old tip, our event driven node will start syncing
//filters after block headers are in sync
//do nothing
Future.unit
} else {
syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
bestFilterOpt = Some(bestFilter))
}
case (Some(bestFilterHeader), None) =>
syncCompactFilters(bestFilterHeader = bestFilterHeader,
chainApi = chainApi,
bestFilterOpt = None)
}
}
def syncFromNewPeer(): Future[Option[Peer]] = {
for {
syncPeerOpt <- randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
_ <- syncHelper(syncPeerOpt)
} yield syncPeerOpt
}
}
case class ResponseTimeout(payload: NetworkPayload)