2023 04 20 decouple node (#5049)

* Remove node parameter to PeerMessageReceiver, pass the ControlMessageHandler and DataMessageHandler as parameters

* Remove node reference in DataMessageHandler, just pass PeerManager as parameter

* Fix pattern match to be exhaustive

* Move fetchCompactFilterHeaders, sendFirstGetCompactFilterHeadersCommand out of DataMessagehandler to decouple DataMessageHandler, PeerManager

* scalafmt
This commit is contained in:
Chris Stewart 2023-04-20 18:39:30 -05:00 committed by GitHub
parent 3740b4fc35
commit 18482c7e44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 128 additions and 104 deletions

View File

@ -100,7 +100,10 @@ class P2PClientActorTest
val peerMessageReceiverF =
for {
node <- NodeUnitTest.buildNode(peer, None)
} yield PeerMessageReceiver(node, peer)
} yield PeerMessageReceiver(
controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler,
peer = peer)
val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv =>

View File

@ -51,7 +51,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
dataMessageHandler = DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
node = node,
peerManager = node.peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
@ -101,7 +101,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
node = node,
peerManager = node.peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
@ -145,7 +145,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
node = node,
peerManager = node.peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
@ -211,7 +211,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
node = node,
peerManager = node.peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,

View File

@ -52,7 +52,7 @@ case class NeutrinoNode(
DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = walletCreationTimeOpt,
node = this,
peerManager = peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
@ -71,7 +71,7 @@ case class NeutrinoNode(
this
}
override val peerManager: PeerManager = PeerManager(paramPeers, this)
override lazy val peerManager: PeerManager = PeerManager(paramPeers, this)
override def start(): Future[NeutrinoNode] = {
val res = for {

View File

@ -37,7 +37,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
implicit def executionContext: ExecutionContext = system.dispatcher
val peerManager: PeerManager
def peerManager: PeerManager
/** The current data message handler.
* It should be noted that the dataMessageHandler contains

View File

@ -33,7 +33,9 @@ case class PeerData(
private lazy val client: Future[P2PClient] = {
val peerMessageReceiver =
PeerMessageReceiver(node, peer)
PeerMessageReceiver(node.controlMessageHandler,
node.getDataMessageHandler,
peer)
P2PClient(
peer = peer,
peerMessageReceiver = peerMessageReceiver,

View File

@ -5,9 +5,11 @@ import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer._
@ -468,6 +470,62 @@ case class PeerManager(
val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] =
dataMessageStreamSource.to(dataMessageStreamSink).run()
def fetchCompactFilterHeaders(
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {
for {
peer <- randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS)
newDmh = currentDmh.copy(syncPeer = Some(peer))
_ = logger.info(s"Now syncing filter headers from $peer")
sender <- peerDataMap(peer).peerMessageSender
newSyncing <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
sender,
currentDmh.chainApi)
} yield {
val syncPeerOpt = if (newSyncing) {
Some(peer)
} else {
None
}
newDmh.copy(syncPeer = syncPeerOpt)
}
}
}
case class ResponseTimeout(payload: NetworkPayload)
object PeerManager {
def sendFirstGetCompactFilterHeadersCommand(
peerMsgSender: PeerMessageSender,
chainApi: ChainApi)(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[Boolean] = {
for {
bestFilterHeaderOpt <-
chainApi
.getBestFilterHeader()
filterCount <- chainApi.getFilterCount()
blockHash = bestFilterHeaderOpt match {
case Some(filterHeaderDb) =>
filterHeaderDb.blockHashBE
case None =>
DoubleSha256DigestBE.empty
}
hashHeightOpt <- chainApi.nextBlockHeaderBatchRange(
prevStopHash = blockHash,
batchSize = chainConfig.filterHeaderBatchSize)
res <- hashHeightOpt match {
case Some(filterSyncMarker) =>
peerMsgSender
.sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ => true)
case None =>
sys.error(
s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount")
}
} yield res
}
}

View File

@ -13,7 +13,7 @@ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
import org.bitcoins.node.{Node, P2PLogger, PeerManager}
import org.bitcoins.node.{P2PLogger, PeerManager}
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future, Promise}
@ -30,7 +30,7 @@ import scala.util.control.NonFatal
case class DataMessageHandler(
chainApi: ChainApi,
walletCreationTimeOpt: Option[Instant],
node: Node,
peerManager: PeerManager,
state: DataMessageHandlerState,
initialSyncDone: Option[Promise[Done]],
filterBatchCache: Set[CompactFilterMessage],
@ -51,14 +51,12 @@ case class DataMessageHandler(
syncPeer = None,
state = HeaderSync)
def manager: PeerManager = node.peerManager
def addToStream(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer): Future[Unit] = {
val msg = DataMessageWrapper(payload, peerMsgSender, peer)
manager.dataMessageStream.offer(msg).map(_ => ())
peerManager.dataMessageStream.offer(msg).map(_ => ())
}
private def isChainIBD: Future[Boolean] = {
@ -300,7 +298,8 @@ case class DataMessageHandler(
case ValidatingHeaders(inSyncWith, _, _) =>
//In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer
//disconnect the ones that we have already checked since they are at least out of sync by 2000 headers
val removeFs = inSyncWith.map(p => manager.removePeer(p))
val removeFs =
inSyncWith.map(p => peerManager.removePeer(p))
val newSyncPeer = Some(peer)
@ -340,16 +339,17 @@ case class DataMessageHandler(
// headers are synced now with the current sync peer, now move to validating it for all peers
assert(syncPeer.get == peer)
if (manager.peers.size > 1) {
if (peerManager.peers.size > 1) {
val newState =
ValidatingHeaders(inSyncWith = Set(peer),
verifyingWith = manager.peers.toSet,
verifyingWith =
peerManager.peers.toSet,
failedCheck = Set.empty[Peer])
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = manager.peerDataMap
val getHeadersAllF = peerManager.peerDataMap
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
@ -361,8 +361,9 @@ case class DataMessageHandler(
.map(_ => newDmh.copy(state = newState))
} else {
//if just one peer then can proceed ahead directly
fetchCompactFilterHeaders(newDmh).map(
_.copy(state = PostHeaderSync))
peerManager
.fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync))
}
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
@ -377,8 +378,9 @@ case class DataMessageHandler(
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
fetchCompactFilterHeaders(newDmh2).map(
_.copy(state = PostHeaderSync))
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers or timeout
Future.successful(newDmh2)
@ -389,7 +391,9 @@ case class DataMessageHandler(
logger.info(
s"Starting to fetch filter headers in data message handler")
val newSyncingF =
sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMsgSender,
chainApi)
newSyncingF.map { newSyncing =>
val syncPeerOpt = if (newSyncing) {
syncPeer
@ -408,8 +412,9 @@ case class DataMessageHandler(
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
fetchCompactFilterHeaders(newDmh2).map(
_.copy(state = PostHeaderSync))
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers
Future.successful(newDmh2)
@ -517,7 +522,8 @@ case class DataMessageHandler(
if (headerHeight > filterHeaderCount) {
logger.info(
s"Starting to fetch filter headers in data message handler")
sendFirstGetCompactFilterHeadersCommand(peerMessageSender)
PeerManager.sendFirstGetCompactFilterHeadersCommand(peerMessageSender,
chainApi)
} else {
require(
headerHeight == filterHeaderCount,
@ -550,17 +556,17 @@ case class DataMessageHandler(
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
state match {
case HeaderSync =>
manager.peerDataMap(peer).updateInvalidMessageCount()
peerManager.peerDataMap(peer).updateInvalidMessageCount()
if (
manager
peerManager
.peerDataMap(peer)
.exceededMaxInvalidMessages && manager.peers.size > 1
.exceededMaxInvalidMessages && peerManager.peers.size > 1
) {
logger.info(
s"$peer exceeded max limit of invalid messages. Disconnecting.")
for {
_ <- manager.removePeer(peer)
newDmh <- manager.syncFromNewPeer()
_ <- peerManager.removePeer(peer)
newDmh <- peerManager.syncFromNewPeer()
} yield newDmh.copy(state = HeaderSync)
} else {
logger.info(s"Re-querying headers from $peer.")
@ -584,7 +590,9 @@ case class DataMessageHandler(
if (newHeaderState.validated) {
logger.info(
s"Done validating headers, inSyncWith=${newHeaderState.inSyncWith}, failedCheck=${newHeaderState.failedCheck}")
fetchCompactFilterHeaders(newDmh).map(_.copy(state = PostHeaderSync))
peerManager
.fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync))
} else {
Future.successful(newDmh)
}
@ -594,40 +602,23 @@ case class DataMessageHandler(
}
}
private def fetchCompactFilterHeaders(
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {
for {
peer <- manager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
newDmh = currentDmh.copy(syncPeer = Some(peer))
_ = logger.info(s"Now syncing filter headers from $peer")
sender <- manager.peerDataMap(peer).peerMessageSender
newSyncing <- sendFirstGetCompactFilterHeadersCommand(sender)
} yield {
val syncPeerOpt = if (newSyncing) {
Some(peer)
} else {
None
}
newDmh.copy(syncPeer = syncPeerOpt)
}
}
def onHeaderRequestTimeout(peer: Peer): Future[DataMessageHandler] = {
logger.info(s"Header request timed out from $peer in state $state")
state match {
case HeaderSync =>
manager.syncFromNewPeer()
peerManager.syncFromNewPeer()
case headerState @ ValidatingHeaders(_, failedCheck, _) =>
val newHeaderState = headerState.copy(failedCheck = failedCheck + peer)
val newDmh = copy(state = newHeaderState)
if (newHeaderState.validated) {
fetchCompactFilterHeaders(newDmh).map(_.copy(state = PostHeaderSync))
peerManager
.fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync))
} else Future.successful(newDmh)
case _: DataMessageHandlerState => Future.successful(this)
case PostHeaderSync => Future.successful(this)
}
}
@ -639,35 +630,6 @@ case class DataMessageHandler(
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = prevStopHash)
private def sendFirstGetCompactFilterHeadersCommand(
peerMsgSender: PeerMessageSender): Future[Boolean] = {
for {
bestFilterHeaderOpt <-
chainApi
.getBestFilterHeader()
filterCount <- chainApi.getFilterCount()
blockHash = bestFilterHeaderOpt match {
case Some(filterHeaderDb) =>
filterHeaderDb.blockHashBE
case None =>
DoubleSha256DigestBE.empty
}
hashHeightOpt <- chainApi.nextBlockHeaderBatchRange(
prevStopHash = blockHash,
batchSize = chainConfig.filterHeaderBatchSize)
res <- hashHeightOpt match {
case Some(filterSyncMarker) =>
peerMsgSender
.sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ => true)
case None =>
sys.error(
s"Could not find block header in database to sync filter headers from! It's likely your database is corrupted blockHash=$blockHash bestFilterHeaderOpt=$bestFilterHeaderOpt filterCount=$filterCount")
}
} yield res
}
private def sendNextGetCompactFilterCommand(
peerMsgSender: PeerMessageSender,
startHeight: Int): Future[Boolean] =

View File

@ -7,7 +7,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.{Node, P2PLogger}
import org.bitcoins.node.P2PLogger
import scala.concurrent.Future
@ -17,8 +17,9 @@ import scala.concurrent.Future
* operations. This is the entry point for handling all received
* [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]]
*/
class PeerMessageReceiver(
node: Node,
case class PeerMessageReceiver(
controlMessageHandler: ControlMessageHandler,
dataMessageHandler: DataMessageHandler,
peer: Peer
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig)
extends P2PLogger {
@ -88,9 +89,12 @@ class PeerMessageReceiver(
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
node.getDataMessageHandler
dataMessageHandler
.addToStream(payload, sender, peer)
.map(_ => new PeerMessageReceiver(node, peer))
.map(_ =>
new PeerMessageReceiver(controlMessageHandler,
dataMessageHandler,
peer))
}
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
@ -104,7 +108,7 @@ class PeerMessageReceiver(
sender: PeerMessageSender,
curReceiverState: PeerMessageReceiverState): Future[
PeerMessageReceiverState] = {
node.controlMessageHandler
controlMessageHandler
.handleControlPayload(payload, sender, peer, curReceiverState)
}
}
@ -121,11 +125,4 @@ object PeerMessageReceiver {
case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient)
extends PeerMessageReceiverMsg
def apply(node: Node, peer: Peer)(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig
): PeerMessageReceiver = {
new PeerMessageReceiver(node = node, peer = peer)
}
}

View File

@ -207,13 +207,14 @@ object NodeUnitTest extends P2PLogger {
walletCreationTimeOpt: Option[Instant])(implicit
appConfig: BitcoinSAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = {
val node = buildNode(peer, chainApi, walletCreationTimeOpt)(
appConfig.chainConf,
appConfig.nodeConf,
system)
val receiver =
PeerMessageReceiver(
node =
buildNode(peer, chainApi, walletCreationTimeOpt)(appConfig.chainConf,
appConfig.nodeConf,
system),
peer = peer)(system, appConfig.nodeConf)
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler,
peer = peer)(system, appConfig.nodeConf)
Future.successful(receiver)
}
@ -388,9 +389,10 @@ object NodeUnitTest extends P2PLogger {
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = {
val node = buildNode(peer, chainApi, walletCreationTimeOpt)
val receiver =
PeerMessageReceiver(node =
buildNode(peer, chainApi, walletCreationTimeOpt),
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler,
peer = peer)
Future.successful(receiver)
}