2023 05 05 encapsulate peermanager peermsgsender (#5066)

* Make PeerManager.peerDataMap private

* Remove PeerMessageSender as param to DataMessageHandler.addToStream()

* Remove PeerMessageSender parameter from DataMessageWrapper

* Add PeerMessageSenderApi

* Try adding a supervision strategy to the stream

* Empty commit to re-run CI

* Adjust log level down to try and get a better idea of whats happening on IC

* Add commandName to exception when we cannot find peerMessageSender in stream

* Try decreasing queue size to reduce async processing between being stream processing and actor receiving external p2p message

* Empty commit to re-run CI

* Increase max concurrent offers to nodeAppConfig.maxConnectedPeers

* Revert logging
This commit is contained in:
Chris Stewart 2023-05-08 15:17:36 -05:00 committed by GitHub
parent 09150e5a1f
commit 8dfb7d091f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 260 additions and 152 deletions

View File

@ -99,8 +99,9 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
def allDisconn: Future[Unit] = AsyncUtil.retryUntilSatisfied(
peers
.map(p =>
!peerManager.peerDataMap.contains(
p) && !peerManager.waitingForDeletion
!peerManager
.getPeerData(p)
.isDefined && !peerManager.waitingForDeletion
.contains(p))
.forall(_ == true),
maxTries = 5,

View File

@ -3,6 +3,7 @@ package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand
import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
@ -89,9 +90,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
GetHeadersMessage(node.chainConfig.chain.genesisHash))
//waiting for response to header query now
client <- node.peerManager
.peerDataMap(bitcoindPeers(0))
.peerMessageSender
.map(_.client)
.getPeerMsgSender(bitcoindPeers(0))
.map(_.get.client)
_ = client.actor ! expectHeaders
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri)
@ -160,11 +160,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
node.chainConfig))
invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader))
sender <- node.peerManager.peerDataMap(peer).peerMessageSender
_ <- node.peerManager.getDataMessageHandler.addToStream(
invalidHeaderMessage,
sender,
peer)
_ <- node.peerManager.getDataMessageHandler
.addToStream(invalidHeaderMessage, peer)
bestChain = bitcoinds(1)
_ <- NodeTestUtil.awaitSync(node, bestChain)
} yield {
@ -180,17 +177,16 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
def sendInvalidHeaders(peer: Peer): Future[Unit] = {
val invalidHeaderMessage =
HeadersMessage(headers = Vector(invalidHeader))
val senderF = node.peerManager.peerDataMap(peer).peerMessageSender
for {
sender <- senderF
sendFs = 1
val sendFs = {
val count = 1
.to(node.nodeConfig.maxInvalidResponsesAllowed + 1)
.map(_ =>
node.peerManager.getDataMessageHandler
.addToStream(invalidHeaderMessage, sender, peer))
_ <- Future.sequence(sendFs)
} yield ()
FutureUtil.sequentially[Int, Unit](count) { _ =>
node.peerManager.getDataMessageHandler
.addToStream(invalidHeaderMessage, peer)
}
}
sendFs.map(_ => ())
}
for {

View File

@ -15,14 +15,9 @@ import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandlerState.{
DoneSyncing,
MisbehavingPeer
}
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
DataMessageHandlerState,
SyncDataMessageHandlerState
DataMessageHandlerState
}
import java.time.Instant
@ -90,14 +85,13 @@ case class NeutrinoNode(
peerManager.getDataMessageHandler.copy(state =
DataMessageHandlerState.HeaderSync(syncPeer)))
for {
peerMsgSender <- peerManager.peerDataMap(syncPeer).peerMessageSender
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
bestFilterOpt <- chainApi.getBestFilter()
blockchains <- blockchainsF
// Get all of our cached headers in case of a reorg
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip)
_ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders)
cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE)
_ <- peerManager.sendGetHeadersMessage(cachedHeaders, Some(syncPeer))
hasStaleTip <- chainApi.isTipStale()
_ <- {
if (hasStaleTip) {
@ -144,10 +138,12 @@ case class NeutrinoNode(
//do nothing
Future.unit
} else {
syncCompactFilters(bestFilterHeader, chainApi, Some(bestFilter))
peerManager.syncCompactFilters(bestFilterHeader,
chainApi,
Some(bestFilter))
}
case (Some(bestFilterHeader), None) =>
syncCompactFilters(bestFilterHeader, chainApi, None)
peerManager.syncCompactFilters(bestFilterHeader, chainApi, None)
}
}
@ -159,63 +155,6 @@ case class NeutrinoNode(
} yield ()
}
/** Starts sync compact filer headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/
private def syncCompactFilters(
bestFilterHeader: CompactFilterHeaderDb,
chainApi: ChainApi,
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val syncPeerMsgSenderOptF = {
peerManager.getDataMessageHandler.state match {
case syncState: SyncDataMessageHandlerState =>
val peerMsgSender =
peerManager.peerDataMap(syncState.syncPeer).peerMessageSender
Some(peerMsgSender)
case DoneSyncing | _: MisbehavingPeer => None
}
}
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match {
case Some(syncPeerMsgSenderF) =>
syncPeerMsgSenderF.flatMap(
_.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
)
case None => Future.successful(false)
}
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
!isSyncFilterHeaders &&
bestFilterOpt.isDefined &&
bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE
) {
syncPeerMsgSenderOptF match {
case Some(syncPeerMsgSenderF) =>
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
syncPeerMsgSenderF.flatMap { sender =>
sender
.sendNextGetCompactFilterCommand(chainApi = chainApi,
filterBatchSize =
chainConfig.filterBatchSize,
startHeight =
bestFilterOpt.get.height)
.map(_ => ())
}
case None =>
logger.warn(
s"Not syncing compact filters since we do not have a syncPeer set, bestFilterOpt=$bestFilterOpt")
Future.unit
}
} else {
Future.unit
}
}
}
/** Gets the number of compact filters in the database */
override def getFilterCount(): Future[Int] =
chainApiFromDb().flatMap(_.getFilterCount())

View File

@ -70,8 +70,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* `private[node]`.
*/
def send(msg: NetworkPayload, peer: Peer): Future[Unit] = {
val senderF = peerManager.peerDataMap(peer).peerMessageSender
senderF.flatMap(_.sendMsg(msg))
peerManager.sendMsg(msg, Some(peer))
}
/** Starts our node */
@ -195,20 +194,19 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
}
syncPeerOpt match {
case Some(peer) =>
peerManager
.peerDataMap(peer)
.peerMessageSender
.flatMap(_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*))
peerManager.sendGetDataMessages(typeIdentifier =
TypeIdentifier.MsgWitnessBlock,
hashes = blockHashes.map(_.flip),
peerOpt = Some(peer))
case None =>
throw new RuntimeException(
"IBD not started yet. Cannot query for blocks.")
}
} else {
val peerMsgSenderF = peerManager.randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF.flatMap(
_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, blockHashes: _*))
peerManager.sendGetDataMessages(typeIdentifier =
TypeIdentifier.MsgWitnessBlock,
hashes = blockHashes.map(_.flip),
peerOpt = None)
}
}

View File

@ -1,12 +1,13 @@
package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import akka.stream.OverflowStrategy
import akka.stream.{ActorAttributes, OverflowStrategy, Supervision}
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.chain.db.{CompactFilterDb, CompactFilterHeaderDb}
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
@ -16,7 +17,7 @@ import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer._
import org.bitcoins.node.networking.P2PClientSupervisor
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
import org.bitcoins.node.util.BitcoinSNodeUtil
import org.bitcoins.node.util.{BitcoinSNodeUtil, PeerMessageSenderApi}
import scodec.bits.ByteVector
import java.net.InetAddress
@ -35,6 +36,7 @@ case class PeerManager(
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
extends StartStopAsync[PeerManager]
with PeerMessageSenderApi
with P2PLogger {
private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty
@ -80,6 +82,130 @@ case class PeerManager(
.map(_.toVector)
}
override def sendMsg(
msg: NetworkPayload,
peerOpt: Option[Peer]): Future[Unit] = {
val peerMsgSenderF = peerOpt match {
case Some(peer) =>
val peerMsgSenderF = peerDataMap(peer).peerMessageSender
peerMsgSenderF
case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF
}
peerMsgSenderF.flatMap(_.sendMsg(msg))
}
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
override def gossipMessage(
msg: NetworkPayload,
excludedPeerOpt: Option[Peer]): Future[Unit] = {
val gossipPeers = excludedPeerOpt match {
case Some(excludedPeer) =>
peerDataMap
.filterNot(_._1 == excludedPeer)
.map(_._1)
case None => peerDataMap.map(_._1)
}
Future
.traverse(gossipPeers)(p => sendMsg(msg, Some(p)))
.map(_ => ())
}
override def sendGetHeadersMessage(
hashes: Vector[DoubleSha256DigestBE],
peerOpt: Option[Peer]): Future[Unit] = {
val peerMsgSenderF = peerOpt match {
case Some(peer) =>
val peerMsgSenderF = peerDataMap(peer).peerMessageSender
peerMsgSenderF
case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF
}
peerMsgSenderF.flatMap(_.sendGetHeadersMessage(hashes.map(_.flip)))
}
override def sendGetDataMessages(
typeIdentifier: TypeIdentifier,
hashes: Vector[DoubleSha256DigestBE],
peerOpt: Option[Peer]): Future[Unit] = {
peerOpt match {
case Some(peer) =>
val peerMsgSenderF = peerDataMap(peer).peerMessageSender
val flip = hashes.map(_.flip)
peerMsgSenderF
.flatMap(_.sendGetDataMessage(typeIdentifier, flip: _*))
case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF.flatMap(
_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
hashes.map(_.flip): _*))
}
}
/** Starts sync compact filer headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/
def syncCompactFilters(
bestFilterHeader: CompactFilterHeaderDb,
chainApi: ChainApi,
bestFilterOpt: Option[CompactFilterDb])(implicit
chainAppConfig: ChainAppConfig): Future[Unit] = {
val syncPeerMsgSenderOptF = {
getDataMessageHandler.state match {
case syncState: SyncDataMessageHandlerState =>
val peerMsgSender =
peerDataMap(syncState.syncPeer).peerMessageSender
Some(peerMsgSender)
case DoneSyncing | _: MisbehavingPeer => None
}
}
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match {
case Some(syncPeerMsgSenderF) =>
syncPeerMsgSenderF.flatMap(
_.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainAppConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
)
case None => Future.successful(false)
}
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
!isSyncFilterHeaders &&
bestFilterOpt.isDefined &&
bestFilterOpt.get.hashBE != bestFilterHeader.filterHashBE
) {
syncPeerMsgSenderOptF match {
case Some(syncPeerMsgSenderF) =>
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
syncPeerMsgSenderF.flatMap { sender =>
sender
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainAppConfig.filterBatchSize,
startHeight = bestFilterOpt.get.height)
.map(_ => ())
}
case None =>
logger.warn(
s"Not syncing compact filters since we do not have a syncPeer set, bestFilterOpt=$bestFilterOpt")
Future.unit
}
} else {
Future.unit
}
}
}
def getPeerMsgSender(peer: Peer): Future[Option[PeerMessageSender]] = {
_peerDataMap.find(_._1 == peer).map(_._2.peerMessageSender) match {
case Some(peerMsgSender) => peerMsgSender.map(Some(_))
@ -223,7 +349,9 @@ case class PeerManager(
}
}
def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap
private def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap
def getPeerData(peer: Peer): Option[PeerData] = peerDataMap.get(peer)
override def stop(): Future[PeerManager] = {
logger.info(s"Stopping PeerManager")
@ -475,29 +603,38 @@ case class PeerManager(
}
private val dataMessageStreamSource = Source
.queue[StreamDataMessageWrapper](1500,
overflowStrategy =
OverflowStrategy.backpressure)
.queue[StreamDataMessageWrapper](
8,
overflowStrategy = OverflowStrategy.backpressure,
maxConcurrentOffers = nodeAppConfig.maxConnectedPeers)
.mapAsync(1) {
case msg @ DataMessageWrapper(payload, peerMsgSender, peer) =>
case msg @ DataMessageWrapper(payload, peer) =>
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
getDataMessageHandler
.handleDataPayload(payload, peerMsgSender, peer)
.flatMap { newDmh =>
newDmh.state match {
case m: MisbehavingPeer =>
updateDataMessageHandler(newDmh)
//disconnect the misbehaving peer
for {
_ <- removePeer(m.badPeer)
_ <- node.syncFromNewPeer()
} yield msg
case _: SyncDataMessageHandlerState | DoneSyncing =>
updateDataMessageHandler(newDmh)
Future.successful(msg)
}
val peerMsgSenderOptF = getPeerMsgSender(peer)
peerMsgSenderOptF.flatMap {
case None =>
Future.failed(new RuntimeException(
s"Couldn't find PeerMessageSender that corresponds with peer=$peer msg=${payload.commandName}. Was it disconnected?"))
case Some(peerMsgSender) =>
getDataMessageHandler
.handleDataPayload(payload, peerMsgSender, peer)
.flatMap { newDmh =>
newDmh.state match {
case m: MisbehavingPeer =>
updateDataMessageHandler(newDmh)
//disconnect the misbehaving peer
for {
_ <- removePeer(m.badPeer)
_ <- node.syncFromNewPeer()
} yield msg
case _: SyncDataMessageHandlerState | DoneSyncing =>
updateDataMessageHandler(newDmh)
Future.successful(msg)
}
}
}
}
case msg @ HeaderTimeoutWrapper(peer) =>
logger.debug(s"Processing timeout header for $peer")
onHeaderRequestTimeout(peer, getDataMessageHandler.state).map {
@ -510,13 +647,21 @@ case class PeerManager(
private val dataMessageStreamSink =
Sink.foreach[StreamDataMessageWrapper] {
case DataMessageWrapper(payload, _, peer) =>
case DataMessageWrapper(payload, peer) =>
logger.debug(s"Done processing ${payload.commandName} in peer=${peer}")
case HeaderTimeoutWrapper(_) =>
}
private val decider: Supervision.Decider = { case err: Throwable =>
logger.error(s"Error occurred while processing p2p pipeline stream", err)
Supervision.Resume
}
val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] =
dataMessageStreamSource.to(dataMessageStreamSink).run()
dataMessageStreamSource
.to(dataMessageStreamSink)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.run()
def fetchCompactFilterHeaders(
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {

View File

@ -48,12 +48,11 @@ case class DataMessageHandler(
copy(filterBatchCache = Set.empty, state = DoneSyncing)
}
def addToStream(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer): Future[Unit] = {
val msg = DataMessageWrapper(payload, peerMsgSender, peer)
peerManager.dataMessageStream.offer(msg).map(_ => ())
def addToStream(payload: DataPayload, peer: Peer): Future[Unit] = {
val msg = DataMessageWrapper(payload, peer)
peerManager.dataMessageStream
.offer(msg)
.map(_ => ())
}
private def isChainIBD: Future[Boolean] = {
@ -476,12 +475,15 @@ case class DataMessageHandler(
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
val result = state match {
case HeaderSync(peer) =>
peerManager.peerDataMap(peer).updateInvalidMessageCount()
if (
peerManager
.peerDataMap(peer)
.exceededMaxInvalidMessages && peerManager.peers.size > 1
) {
val peerDataOpt = peerManager.getPeerData(peer)
val peerData = peerDataOpt match {
case Some(peerData) => peerData
case None =>
sys.error(
s"Cannot find peer we are syncing with in PeerManager, peer=$peer")
}
peerData.updateInvalidMessageCount()
if (peerData.exceededMaxInvalidMessages && peerManager.peers.size > 1) {
logger.warn(
s"$peer exceeded max limit of invalid messages. Disconnecting.")
@ -786,15 +788,12 @@ case class DataMessageHandler(
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = peerManager.peerDataMap
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
_.sendGetHeadersMessage(lastHash))
)
val getHeadersAllF = {
val msg = GetHeadersMessage(lastHash)
peerManager.gossipMessage(msg, excludedPeerOpt = Some(peer))
}
Future
.sequence(getHeadersAllF)
getHeadersAllF
.map(_ => newDmh.copy(state = newState))
} else {
//if just one peer then can proceed ahead directly
@ -856,10 +855,7 @@ case class DataMessageHandler(
sealed trait StreamDataMessageWrapper
case class DataMessageWrapper(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer)
case class DataMessageWrapper(payload: DataPayload, peer: Peer)
extends StreamDataMessageWrapper
case class HeaderTimeoutWrapper(peer: Peer) extends StreamDataMessageWrapper

View File

@ -72,7 +72,7 @@ case class PeerMessageReceiver(
sender = peerMsgSender,
curReceiverState = curState)
case dataPayload: DataPayload =>
handleDataPayload(payload = dataPayload, sender = peerMsgSender)
handleDataPayload(payload = dataPayload)
.map(_ => curState)
}
}
@ -85,12 +85,11 @@ case class PeerMessageReceiver(
* @param sender
*/
private def handleDataPayload(
payload: DataPayload,
sender: PeerMessageSender): Future[PeerMessageReceiver] = {
payload: DataPayload): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
dataMessageHandler
.addToStream(payload, sender, peer)
.addToStream(payload, peer)
.map(_ =>
new PeerMessageReceiver(controlMessageHandler,
dataMessageHandler,

View File

@ -0,0 +1,34 @@
package org.bitcoins.node.util
import org.bitcoins.core.p2p.{NetworkPayload, TypeIdentifier}
import org.bitcoins.crypto.{DoubleSha256DigestBE}
import org.bitcoins.node.models.Peer
import scala.concurrent.Future
trait PeerMessageSenderApi {
def sendGetDataMessage(
typeIdentifier: TypeIdentifier,
hash: DoubleSha256DigestBE,
peerOpt: Option[Peer]): Future[Unit] = {
sendGetDataMessages(typeIdentifier, Vector(hash), peerOpt)
}
def sendGetDataMessages(
typeIdentifier: TypeIdentifier,
hashes: Vector[DoubleSha256DigestBE],
peerOpt: Option[Peer]): Future[Unit]
def sendGetHeadersMessage(
hashes: Vector[DoubleSha256DigestBE],
peerOpt: Option[Peer]): Future[Unit]
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
def gossipMessage(
msg: NetworkPayload,
excludedPeerOpt: Option[Peer]): Future[Unit]
def sendMsg(msg: NetworkPayload, peerOpt: Option[Peer]): Future[Unit]
}