Make ControlMessageHandler take PeerManager rather than Node as a param (#5081)

* Make ControlMessageHandler take PeerManager rather than Node as a param

* refactor PeerData to not take a reference to Node

* Move ControlMessageHandler out of {Node,NeutrinoNode}
This commit is contained in:
Chris Stewart 2023-05-29 09:01:06 -05:00 committed by GitHub
parent f99113f2f8
commit 34df4ccbb1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 38 deletions

View file

@ -6,6 +6,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.ConnectCommand
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
PeerMessageReceiver,
PeerMessageReceiverState
}
@ -103,9 +104,10 @@ class P2PClientActorTest
//piggy back off of node infra to setup p2p clients, but don't actually use
//the node itself so stop it here an clean up resources allocated by it
_ <- node.stop()
} yield PeerMessageReceiver(
controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.peerManager.getDataMessageHandler,
controlMessageHandler = ControlMessageHandler(node.peerManager)
} yield PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
dataMessageHandler =
node.peerManager.getDataMessageHandler,
peer = peer)
val clientActorF: Future[TestActorRef[P2PClientActor]] =

View file

@ -15,10 +15,7 @@ import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
DataMessageHandlerState
}
import org.bitcoins.node.networking.peer.{DataMessageHandlerState}
import java.time.Instant
import scala.concurrent.Future
@ -41,8 +38,6 @@ case class NeutrinoNode(
implicit override def chainAppConfig: ChainAppConfig = chainConfig
val controlMessageHandler: ControlMessageHandler = ControlMessageHandler(this)
override lazy val peerManager: PeerManager =
PeerManager(paramPeers, this, walletCreationTimeOpt)

View file

@ -21,7 +21,6 @@ import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
MisbehavingPeer
}
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
PeerMessageSender,
SyncDataMessageHandlerState
}
@ -42,9 +41,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
implicit def executionContext: ExecutionContext = system.dispatcher
def peerManager: PeerManager
def controlMessageHandler: ControlMessageHandler
def nodeCallbacks: NodeCallbacks = nodeAppConfig.callBacks
lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO()

View file

@ -7,6 +7,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
PeerMessageReceiver,
PeerMessageReceiverState,
PeerMessageSender
@ -19,7 +20,8 @@ import scala.concurrent.duration.DurationInt
*/
case class PeerData(
peer: Peer,
node: Node,
controlMessageHandler: ControlMessageHandler,
peerManager: PeerManager,
supervisor: ActorRef
)(implicit
system: ActorSystem,
@ -33,18 +35,18 @@ case class PeerData(
private lazy val client: Future[P2PClient] = {
val peerMessageReceiver =
PeerMessageReceiver(node.controlMessageHandler,
node.peerManager.getDataMessageHandler,
PeerMessageReceiver(controlMessageHandler,
peerManager.getDataMessageHandler,
peer)
P2PClient(
peer = peer,
peerMessageReceiver = peerMessageReceiver,
peerMsgRecvState = PeerMessageReceiverState.fresh(),
onReconnect = node.peerManager.onReconnect,
onStop = node.peerManager.onP2PClientStopped,
onInitializationTimeout = node.peerManager.onInitializationTimeout,
node.peerManager.onQueryTimeout,
node.peerManager.sendResponseTimeout,
onReconnect = peerManager.onReconnect,
onStop = peerManager.onP2PClientStopped,
onInitializationTimeout = peerManager.onInitializationTimeout,
onQueryTimeout = peerManager.onQueryTimeout,
sendResponseTimeout = peerManager.sendResponseTimeout,
maxReconnectionTries = 4,
supervisor = supervisor
)

View file

@ -7,6 +7,7 @@ import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer.ControlMessageHandler
import java.net.{InetAddress, UnknownHostException}
import java.util.concurrent.atomic.AtomicBoolean
@ -18,7 +19,8 @@ import scala.util.{Failure, Random, Success}
case class PeerFinder(
paramPeers: Vector[Peer],
node: NeutrinoNode,
controlMessageHandler: ControlMessageHandler,
peerManager: PeerManager,
skipPeers: () => Vector[Peer],
supervisor: ActorRef)(implicit
ec: ExecutionContext,
@ -216,12 +218,16 @@ case class PeerFinder(
/** creates and initialises a new test peer */
private def tryPeer(peer: Peer): Future[Unit] = {
_peerData.put(peer, PeerData(peer, node, supervisor))
_peerData.put(
peer,
PeerData(peer, controlMessageHandler, peerManager, supervisor))
_peerData(peer).peerMessageSender.map(_.connect())
}
private def tryToReconnectPeer(peer: Peer): Future[Unit] = {
_peerData.put(peer, PeerData(peer, node, supervisor))
_peerData.put(
peer,
PeerData(peer, controlMessageHandler, peerManager, supervisor))
_peerData(peer).peerMessageSender.map(_.reconnect())
}

View file

@ -65,7 +65,8 @@ case class PeerManager(
private val finder: PeerFinder =
PeerFinder(paramPeers = paramPeers,
node = node,
controlMessageHandler = ControlMessageHandler(this),
peerManager = this,
skipPeers = () => peers,
supervisor = supervisor)

View file

@ -2,14 +2,17 @@ package org.bitcoins.node.networking.peer
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.{Node, P2PLogger}
import org.bitcoins.node.{P2PLogger, PeerManager}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
case class ControlMessageHandler(peerManager: PeerManager)(implicit
ec: ExecutionContext,
nodeAppConfig: NodeAppConfig)
extends P2PLogger {
def handleControlPayload(
@ -33,7 +36,7 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
case good: Initializing =>
val newState = good.withVersionMsg(versionMsg)
node.peerManager.onVersionMessage(peer, versionMsg)
peerManager.onVersionMessage(peer, versionMsg)
sender.sendVerackMessage().map(_ => newState)
}
@ -50,7 +53,7 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
case good: Initializing =>
val newState = good.toNormal(VerAckMessage)
node.peerManager.onInitialization(peer).map(_ => newState)
peerManager.onInitialization(peer).map(_ => newState)
}
case ping: PingMessage =>
@ -92,8 +95,8 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
NetworkUtil.parseInetSocketAddress(bytes, networkAddress.port)
val peer = Peer.fromSocket(socket = inetAddress,
socks5ProxyParams =
node.nodeAppConfig.socks5ProxyParams)
node.peerManager.addPeerToTry(Vector(peer), 0)
nodeAppConfig.socks5ProxyParams)
peerManager.addPeerToTry(Vector(peer), 0)
}
case addr: AddrV2Message =>
val bytes = addr.bytes
@ -103,14 +106,14 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
NetworkUtil.parseInetSocketAddress(bytes, port)
val peer = Peer.fromSocket(socket = inetAddress,
socks5ProxyParams =
node.nodeAppConfig.socks5ProxyParams)
nodeAppConfig.socks5ProxyParams)
val priority = if (services.nodeCompactFilters) 1 else 0
addr match {
case IPv4AddrV2Message(_, _, _, _) | IPv6AddrV2Message(_, _, _, _) =>
node.peerManager.addPeerToTry(Vector(peer), priority = priority)
peerManager.addPeerToTry(Vector(peer), priority = priority)
case TorV3AddrV2Message(_, _, _, _) =>
if (node.nodeAppConfig.torConf.enabled)
node.peerManager.addPeerToTry(Vector(peer), priority)
if (nodeAppConfig.torConf.enabled)
peerManager.addPeerToTry(Vector(peer), priority)
case _ => logger.debug(s"Unsupported network. Skipping.")
}
}

View file

@ -211,8 +211,11 @@ object NodeUnitTest extends P2PLogger {
appConfig.chainConf,
appConfig.nodeConf,
system)
val controlMessageHandler = ControlMessageHandler(node.peerManager)(
system.dispatcher,
appConfig.nodeConf)
val receiver =
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
dataMessageHandler =
node.peerManager.getDataMessageHandler,
peer = peer)(system, appConfig.nodeConf)
@ -368,8 +371,10 @@ object NodeUnitTest extends P2PLogger {
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = {
val node = buildNode(peer, chainApi, walletCreationTimeOpt)
val controlMessageHandler =
ControlMessageHandler(node.peerManager)(system.dispatcher, nodeAppConfig)
val receiver =
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
dataMessageHandler =
node.peerManager.getDataMessageHandler,
peer = peer)