2023 04 22 peermanager dmh refactor (#5057)

* WIP: Move DataMessageHandler into PeerManager

* Get things compiling

* Turn off logging
This commit is contained in:
Chris Stewart 2023-04-24 08:14:02 -05:00 committed by GitHub
parent 1461782865
commit ce6d2212c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 92 deletions

View file

@ -63,8 +63,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
nodeConnectedWithBitcoinds => nodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoinds.node val node = nodeConnectedWithBitcoinds.node
val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds
val peerManager = node.peerManager def peers = node.peerManager.peers
def peers = peerManager.peers
for { for {
bitcoindPeers <- bitcoinPeersF bitcoindPeers <- bitcoinPeersF
@ -72,15 +71,15 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
maxTries = 30, maxTries = 30,
interval = 1.second) interval = 1.second)
//sync from first bitcoind //sync from first bitcoind
_ = node.updateDataMessageHandler( _ = node.peerManager.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(bitcoindPeers(0)))( node.peerManager.getDataMessageHandler.copy(syncPeer =
executionContext, Some(bitcoindPeers(0)))(executionContext,
node.nodeAppConfig, node.nodeAppConfig,
node.chainAppConfig)) node.chainAppConfig))
expectHeaders = ExpectResponseCommand( expectHeaders = ExpectResponseCommand(
GetHeadersMessage(node.chainConfig.chain.genesisHash)) GetHeadersMessage(node.chainConfig.chain.genesisHash))
//waiting for response to header query now //waiting for response to header query now
client <- peerManager client <- node.peerManager
.peerDataMap(bitcoindPeers(0)) .peerDataMap(bitcoindPeers(0))
.peerMessageSender .peerMessageSender
.map(_.client) .map(_.client)
@ -89,7 +88,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
_ <- bitcoinds(0).disconnectNode(nodeUri) _ <- bitcoinds(0).disconnectNode(nodeUri)
_ = logger.info(s"Disconnected $nodeUri from bitcoind") _ = logger.info(s"Disconnected $nodeUri from bitcoind")
//old peer we were syncing with that just disconnected us //old peer we were syncing with that just disconnected us
oldSyncPeer = node.getDataMessageHandler.syncPeer.get oldSyncPeer = node.peerManager.getDataMessageHandler.syncPeer.get
_ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1)) _ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1))
expectedSyncPeer = bitcoindPeers(1) expectedSyncPeer = bitcoindPeers(1)
} yield { } yield {
@ -133,17 +132,18 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
_ <- AsyncUtil.retryUntilSatisfied(node.peerManager.peers.size == 2) _ <- AsyncUtil.retryUntilSatisfied(node.peerManager.peers.size == 2)
peers <- bitcoinPeersF peers <- bitcoinPeersF
peer = peers.head peer = peers.head
_ = node.updateDataMessageHandler( _ = node.peerManager.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(peer))( node.peerManager.getDataMessageHandler.copy(syncPeer = Some(peer))(
executionContext, executionContext,
node.nodeConfig, node.nodeConfig,
node.chainConfig)) node.chainConfig))
invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader)) invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader))
sender <- node.peerManager.peerDataMap(peer).peerMessageSender sender <- node.peerManager.peerDataMap(peer).peerMessageSender
_ <- node.getDataMessageHandler.addToStream(invalidHeaderMessage, _ <- node.peerManager.getDataMessageHandler.addToStream(
sender, invalidHeaderMessage,
peer) sender,
peer)
bestChain = bitcoinds(1) bestChain = bitcoinds(1)
_ <- NodeTestUtil.awaitSync(node, bestChain) _ <- NodeTestUtil.awaitSync(node, bestChain)
} yield { } yield {
@ -166,9 +166,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
sendFs = 1 sendFs = 1
.to(node.nodeConfig.maxInvalidResponsesAllowed + 1) .to(node.nodeConfig.maxInvalidResponsesAllowed + 1)
.map(_ => .map(_ =>
node.getDataMessageHandler.addToStream(invalidHeaderMessage, node.peerManager.getDataMessageHandler
sender, .addToStream(invalidHeaderMessage, sender, peer))
peer))
_ <- Future.sequence(sendFs) _ <- Future.sequence(sendFs)
} yield () } yield ()
} }
@ -178,8 +177,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
peers <- bitcoinPeersF peers <- bitcoinPeersF
peer = peers(0) peer = peers(0)
_ <- node.peerManager.isConnected(peer).map(assert(_)) _ <- node.peerManager.isConnected(peer).map(assert(_))
_ = node.updateDataMessageHandler( _ = node.peerManager.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(peer))( node.peerManager.getDataMessageHandler.copy(syncPeer = Some(peer))(
executionContext, executionContext,
node.nodeConfig, node.nodeConfig,
node.chainConfig)) node.chainConfig))

View file

@ -102,7 +102,7 @@ class P2PClientActorTest
node <- NodeUnitTest.buildNode(peer, None) node <- NodeUnitTest.buildNode(peer, None)
} yield PeerMessageReceiver( } yield PeerMessageReceiver(
controlMessageHandler = node.controlMessageHandler, controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler, dataMessageHandler = node.peerManager.getDataMessageHandler,
peer = peer) peer = peer)
val clientActorF: Future[TestActorRef[P2PClientActor]] = val clientActorF: Future[TestActorRef[P2PClientActor]] =

View file

@ -15,15 +15,10 @@ import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandlerState.HeaderSync import org.bitcoins.node.networking.peer.{ControlMessageHandler}
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
DataMessageHandler
}
import java.time.Instant import java.time.Instant
import scala.concurrent.duration.DurationInt import scala.concurrent.{Future}
import scala.concurrent.{Await, Future}
case class NeutrinoNode( case class NeutrinoNode(
chainApi: ChainApi, chainApi: ChainApi,
@ -45,33 +40,8 @@ case class NeutrinoNode(
val controlMessageHandler: ControlMessageHandler = ControlMessageHandler(this) val controlMessageHandler: ControlMessageHandler = ControlMessageHandler(this)
private var dataMessageHandler: DataMessageHandler = { override lazy val peerManager: PeerManager =
val result = for { PeerManager(paramPeers, this, walletCreationTimeOpt)
chainApi <- chainApiFromDb()
} yield {
DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = walletCreationTimeOpt,
peerManager = peerManager,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
syncPeer = None
)
}
Await.result(result, 10.seconds)
}
override def getDataMessageHandler: DataMessageHandler = dataMessageHandler
override def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): NeutrinoNode = {
this.dataMessageHandler = dataMessageHandler
this
}
override lazy val peerManager: PeerManager = PeerManager(paramPeers, this)
override def start(): Future[NeutrinoNode] = { override def start(): Future[NeutrinoNode] = {
val res = for { val res = for {
@ -102,8 +72,8 @@ case class NeutrinoNode(
syncPeer <- peerManager.randomPeerWithService( syncPeer <- peerManager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS) ServiceIdentifier.NODE_COMPACT_FILTERS)
_ = logger.info(s"Syncing with $syncPeer") _ = logger.info(s"Syncing with $syncPeer")
_ = updateDataMessageHandler( _ = peerManager.updateDataMessageHandler(
dataMessageHandler.copy(syncPeer = Some(syncPeer))) peerManager.getDataMessageHandler.copy(syncPeer = Some(syncPeer)))
peerMsgSender <- peerManager.peerDataMap(syncPeer).peerMessageSender peerMsgSender <- peerManager.peerDataMap(syncPeer).peerMessageSender
header <- chainApi.getBestBlockHeader() header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader() bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
@ -165,6 +135,13 @@ case class NeutrinoNode(
} }
} }
override def syncFromNewPeer(): Future[Unit] = {
logger.info(s"Trying to sync from new peer")
val _ = peerManager.updateDataMessageHandler(
peerManager.getDataMessageHandler.reset)
sync().map(_ => ())
}
/** Starts sync compact filer headers. /** Starts sync compact filer headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers * Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/ */
@ -173,7 +150,7 @@ case class NeutrinoNode(
chainApi: ChainApi, chainApi: ChainApi,
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = { bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val syncPeerMsgSenderOptF = { val syncPeerMsgSenderOptF = {
dataMessageHandler.syncPeer.map { peer => peerManager.getDataMessageHandler.syncPeer.map { peer =>
peerManager.peerDataMap(peer).peerMessageSender peerManager.peerDataMap(peer).peerMessageSender
} }
} }

View file

@ -18,7 +18,6 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._ import org.bitcoins.node.models._
import org.bitcoins.node.networking.peer.{ import org.bitcoins.node.networking.peer.{
ControlMessageHandler, ControlMessageHandler,
DataMessageHandler,
PeerMessageSender PeerMessageSender
} }
@ -39,22 +38,12 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
def peerManager: PeerManager def peerManager: PeerManager
/** The current data message handler.
* It should be noted that the dataMessageHandler contains
* chainstate. When we update with a new chainstate, we need to
* make sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]]
* to make sure we don't corrupt our chainstate cache
*/
def getDataMessageHandler: DataMessageHandler
def controlMessageHandler: ControlMessageHandler def controlMessageHandler: ControlMessageHandler
def nodeCallbacks: NodeCallbacks = nodeAppConfig.callBacks def nodeCallbacks: NodeCallbacks = nodeAppConfig.callBacks
lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO() lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO()
def updateDataMessageHandler(dataMessageHandler: DataMessageHandler): Node
/** This is constructing a chain api from disk every time we call this method /** This is constructing a chain api from disk every time we call this method
* This involves database calls which can be slow and expensive to construct * This involves database calls which can be slow and expensive to construct
* our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]] * our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]]
@ -134,6 +123,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
*/ */
def sync(): Future[Unit] def sync(): Future[Unit]
def syncFromNewPeer(): Future[Unit]
/** Broadcasts the given transaction over the P2P network */ /** Broadcasts the given transaction over the P2P network */
override def broadcastTransactions( override def broadcastTransactions(
transactions: Vector[Transaction]): Future[Unit] = { transactions: Vector[Transaction]): Future[Unit] = {
@ -193,7 +184,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
isIBD: Boolean, isIBD: Boolean,
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = { blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
if (isIBD) { if (isIBD) {
val syncPeerOpt = getDataMessageHandler.syncPeer val syncPeerOpt = peerManager.getDataMessageHandler.syncPeer
syncPeerOpt match { syncPeerOpt match {
case Some(peer) => case Some(peer) =>
peerManager peerManager

View file

@ -34,7 +34,7 @@ case class PeerData(
private lazy val client: Future[P2PClient] = { private lazy val client: Future[P2PClient] = {
val peerMessageReceiver = val peerMessageReceiver =
PeerMessageReceiver(node.controlMessageHandler, PeerMessageReceiver(node.controlMessageHandler,
node.getDataMessageHandler, node.peerManager.getDataMessageHandler,
peer) peer)
P2PClient( P2PClient(
peer = peer, peer = peer,

View file

@ -4,6 +4,7 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.{ChainHandler}
import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.node.NodeType import org.bitcoins.core.api.node.NodeType
@ -19,7 +20,7 @@ import org.bitcoins.node.util.BitcoinSNodeUtil
import scodec.bits.ByteVector import scodec.bits.ByteVector
import java.net.InetAddress import java.net.InetAddress
import java.time.Duration import java.time.{Duration, Instant}
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.{ExecutionContext, Future, Promise}
@ -27,7 +28,8 @@ import scala.util.Random
case class PeerManager( case class PeerManager(
paramPeers: Vector[Peer] = Vector.empty, paramPeers: Vector[Peer] = Vector.empty,
node: NeutrinoNode)(implicit node: NeutrinoNode,
walletCreationTimeOpt: Option[Instant])(implicit
ec: ExecutionContext, ec: ExecutionContext,
system: ActorSystem, system: ActorSystem,
nodeAppConfig: NodeAppConfig, nodeAppConfig: NodeAppConfig,
@ -363,9 +365,9 @@ case class PeerManager(
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to //actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it //reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
_peerDataMap.remove(peer) _peerDataMap.remove(peer)
val syncPeer = node.getDataMessageHandler.syncPeer val syncPeer = getDataMessageHandler.syncPeer
if (peers.length > 1 && syncPeer.isDefined && syncPeer.get == peer) { if (peers.length > 1 && syncPeer.isDefined && syncPeer.get == peer) {
syncFromNewPeer().map(_ => ()) node.syncFromNewPeer().map(_ => ())
} else if (syncPeer.isEmpty) { } else if (syncPeer.isEmpty) {
Future.unit Future.unit
} else { } else {
@ -410,8 +412,8 @@ case class PeerManager(
case _: GetHeadersMessage => case _: GetHeadersMessage =>
dataMessageStream.offer(HeaderTimeoutWrapper(peer)).map(_ => ()) dataMessageStream.offer(HeaderTimeoutWrapper(peer)).map(_ => ())
case _ => case _ =>
if (peer == node.getDataMessageHandler.syncPeer.get) if (peer == getDataMessageHandler.syncPeer.get)
syncFromNewPeer().map(_ => ()) node.syncFromNewPeer().map(_ => ())
else Future.unit else Future.unit
} }
} }
@ -421,24 +423,27 @@ case class PeerManager(
Future.unit Future.unit
} }
def syncFromNewPeer(): Future[DataMessageHandler] =
node.syncFromNewPeer().map(_ => getDataMessageHandler)
private def onHeaderRequestTimeout( private def onHeaderRequestTimeout(
peer: Peer, peer: Peer,
state: DataMessageHandlerState): Future[DataMessageHandler] = { state: DataMessageHandlerState): Future[DataMessageHandler] = {
logger.info(s"Header request timed out from $peer in state $state") logger.info(s"Header request timed out from $peer in state $state")
state match { state match {
case HeaderSync => case HeaderSync =>
syncFromNewPeer() node.syncFromNewPeer().map(_ => getDataMessageHandler)
case headerState @ ValidatingHeaders(_, failedCheck, _) => case headerState @ ValidatingHeaders(_, failedCheck, _) =>
val newHeaderState = headerState.copy(failedCheck = failedCheck + peer) val newHeaderState = headerState.copy(failedCheck = failedCheck + peer)
val newDmh = node.getDataMessageHandler.copy(state = newHeaderState) val newDmh = getDataMessageHandler.copy(state = newHeaderState)
if (newHeaderState.validated) { if (newHeaderState.validated) {
fetchCompactFilterHeaders(newDmh) fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync)) .map(_.copy(state = PostHeaderSync))
} else Future.successful(newDmh) } else Future.successful(newDmh)
case PostHeaderSync => Future.successful(node.getDataMessageHandler) case PostHeaderSync => Future.successful(getDataMessageHandler)
} }
} }
@ -454,13 +459,6 @@ case class PeerManager(
} }
} }
def syncFromNewPeer(): Future[DataMessageHandler] = {
logger.info(s"Trying to sync from new peer")
val newNode =
node.updateDataMessageHandler(node.getDataMessageHandler.reset)
newNode.sync().map(_ => node.getDataMessageHandler)
}
private val dataMessageStreamSource = Source private val dataMessageStreamSource = Source
.queue[StreamDataMessageWrapper](1500, .queue[StreamDataMessageWrapper](1500,
overflowStrategy = overflowStrategy =
@ -468,17 +466,17 @@ case class PeerManager(
.mapAsync(1) { .mapAsync(1) {
case msg @ DataMessageWrapper(payload, peerMsgSender, peer) => case msg @ DataMessageWrapper(payload, peerMsgSender, peer) =>
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream") logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
node.getDataMessageHandler getDataMessageHandler
.handleDataPayload(payload, peerMsgSender, peer) .handleDataPayload(payload, peerMsgSender, peer)
.map { newDmh => .map { newDmh =>
node.updateDataMessageHandler(newDmh) updateDataMessageHandler(newDmh)
msg msg
} }
case msg @ HeaderTimeoutWrapper(peer) => case msg @ HeaderTimeoutWrapper(peer) =>
logger.debug(s"Processing timeout header for $peer") logger.debug(s"Processing timeout header for $peer")
onHeaderRequestTimeout(peer, node.getDataMessageHandler.state).map { onHeaderRequestTimeout(peer, getDataMessageHandler.state).map {
newDmh => newDmh =>
node.updateDataMessageHandler(newDmh) updateDataMessageHandler(newDmh)
logger.debug(s"Done processing timeout header for $peer") logger.debug(s"Done processing timeout header for $peer")
msg msg
} }
@ -514,6 +512,26 @@ case class PeerManager(
} }
} }
private var dataMessageHandler: DataMessageHandler = {
DataMessageHandler(
chainApi = ChainHandler.fromDatabase(),
walletCreationTimeOpt = walletCreationTimeOpt,
peerManager = this,
state = HeaderSync,
initialSyncDone = None,
filterBatchCache = Set.empty,
syncPeer = None
)
}
def getDataMessageHandler: DataMessageHandler = dataMessageHandler
def updateDataMessageHandler(
dataMessageHandler: DataMessageHandler): PeerManager = {
this.dataMessageHandler = dataMessageHandler
this
}
} }
case class ResponseTimeout(payload: NetworkPayload) case class ResponseTimeout(payload: NetworkPayload)

View file

@ -213,7 +213,8 @@ object NodeUnitTest extends P2PLogger {
system) system)
val receiver = val receiver =
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler, PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler, dataMessageHandler =
node.peerManager.getDataMessageHandler,
peer = peer)(system, appConfig.nodeConf) peer = peer)(system, appConfig.nodeConf)
Future.successful(receiver) Future.successful(receiver)
} }
@ -392,7 +393,8 @@ object NodeUnitTest extends P2PLogger {
val node = buildNode(peer, chainApi, walletCreationTimeOpt) val node = buildNode(peer, chainApi, walletCreationTimeOpt)
val receiver = val receiver =
PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler, PeerMessageReceiver(controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.getDataMessageHandler, dataMessageHandler =
node.peerManager.getDataMessageHandler,
peer = peer) peer = peer)
Future.successful(receiver) Future.successful(receiver)
} }