Remove offer(SendToPeer) from PeerManager queue (#5119)

* Remove offer(SendToPeer) from PeerManager queue

* scalafmt

* Remove comments
This commit is contained in:
Chris Stewart 2023-06-28 07:58:34 -05:00 committed by GitHub
parent 08a76fb040
commit 6befad2dd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 86 deletions

View file

@ -1,11 +1,11 @@
package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage, NetworkMessage}
import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.{DataMessageWrapper, SendToPeer}
import org.bitcoins.node.networking.peer.DataMessageWrapper
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoinds
@ -76,8 +76,11 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
networkPayload =
GetHeadersMessage(node.chainConfig.chain.genesisHash)
//waiting for response to header query now
networkMessage = NetworkMessage(networkParam, networkPayload)
_ <- node.peerManager.offer(SendToPeer(networkMessage, Some(peer0)))
_ <- node.peerManager
.getPeerData(peer0)
.get
.peerMessageSenderApi
.sendMsg(networkPayload, Some(peer0))
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri)
_ = logger.debug(

View file

@ -104,8 +104,46 @@ case class PeerManager(
msg: NetworkPayload,
peerOpt: Option[Peer]): Future[Unit] = {
val networkMessage = NetworkMessage(nodeAppConfig.network, msg)
offer(SendToPeer(msg = networkMessage, peerOpt = peerOpt))
.map(_ => ())
val sendToPeer = SendToPeer(networkMessage, peerOpt)
logger.debug(
s"Sending message ${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}")
val peerMsgSenderOptF: Future[Option[PeerMessageSender]] =
sendToPeer.peerOpt match {
case Some(peer) =>
getPeerMsgSender(peer).flatMap {
case Some(peerMsgSender) =>
Future.successful(Some(peerMsgSender))
case None =>
sendToPeer.msg.payload match {
case _: ControlPayload =>
//peer may not be fully initialized, we may be doing the handshake with a peer
finderOpt.get
.getData(peer)
.map(_.peerMessageSender) match {
case Some(p) => p.map(Some(_))
case None => FutureUtil.none
}
case _: DataPayload =>
//peer must be fully initialized to send a data payload
val msg =
s"Cannot find peerOpt=${sendToPeer.peerOpt} to send message=${sendToPeer.msg.payload.commandName} to. It may have been disconnected, sending to another random peer."
logger.warn(msg)
randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK)
}
}
case None =>
randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK)
}
peerMsgSenderOptF.flatMap {
case Some(peerMsgSender) =>
peerMsgSender
.sendMsg(sendToPeer.msg)
case None =>
Future.failed(new RuntimeException(
s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to. This means we are not connected to any peers."))
}
}
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
@ -711,53 +749,6 @@ case class PeerManager(
StreamDataMessageWrapper,
Future[DataMessageHandler]] = {
Sink.foldAsync(initDmh) {
case (dmh, sendToPeer: SendToPeer) =>
logger.debug(
s"Sending message ${sendToPeer.msg.payload.commandName} to peerOpt=${sendToPeer.peerOpt}")
val peerMsgSenderOptF: Future[Option[PeerMessageSender]] =
sendToPeer.peerOpt match {
case Some(peer) =>
getPeerMsgSender(peer).flatMap {
case Some(peerMsgSender) =>
Future.successful(Some(peerMsgSender))
case None =>
sendToPeer.msg.payload match {
case _: ControlPayload =>
//peer may not be fully initialized, we may be doing the handshake with a peer
finderOpt.get
.getData(peer)
.map(_.peerMessageSender) match {
case Some(p) => p.map(Some(_))
case None => FutureUtil.none
}
case _: DataPayload =>
//peer must be fully initialized to send a data payload
val msg =
s"Cannot find peerOpt=${sendToPeer.peerOpt} to send message=${sendToPeer.msg.payload.commandName} to. It may have been disconnected, sending to another random peer."
logger.warn(msg)
randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
}
}
case None =>
dmh.state match {
case s: SyncDataMessageHandlerState =>
getPeerMsgSender(s.syncPeer)
case DoneSyncing | _: MisbehavingPeer | _: RemovePeers =>
//pick a random peer to sync with
randomPeerMsgSenderWithService(ServiceIdentifier.NODE_NETWORK)
}
}
peerMsgSenderOptF.flatMap {
case Some(peerMsgSender) =>
peerMsgSender
.sendMsg(sendToPeer.msg)
.map(_ => handleDisconnectedPeer(sendToPeer, peerMsgSender, dmh))
case None =>
Future.failed(new RuntimeException(
s"Unable to find peer message sender to send msg=${sendToPeer.msg.header.commandName} to. This means we are not connected to any peers."))
}
case (dmh, DataMessageWrapper(payload, peer)) =>
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
val peerMsgSenderOptF = getPeerMsgSender(peer)
@ -1011,38 +1002,6 @@ case class PeerManager(
_ <- syncHelper(syncPeerOpt)
} yield syncPeerOpt
}
/** Handles the case where we need to send a message to a peer, but that peer was disconnected
* We change the peer and the adjust the state in [[DataMessageHandler]]
* @param sendToPeer the peer we were originally sending the message to
* @param peerMessageSender the new peer we are going to send the message to
* @param dmh the data message handler we need to adjust state of
*/
private def handleDisconnectedPeer(
sendToPeer: SendToPeer,
peerMessageSender: PeerMessageSender,
dmh: DataMessageHandler): DataMessageHandler = {
val destination = peerMessageSender.peer
val newState: DataMessageHandlerState = sendToPeer.peerOpt match {
case Some(originalPeer) =>
if (originalPeer != destination) {
//need to replace syncPeer with newSyncPeer
dmh.state match {
case s: SyncDataMessageHandlerState =>
s.replaceSyncPeer(destination)
case m: MisbehavingPeer => m
case r: RemovePeers => r
case DoneSyncing => DoneSyncing
}
} else {
dmh.state
}
case None =>
dmh.state
}
dmh.copy(state = newState)
}
}
case class ResponseTimeout(payload: NetworkPayload)

View file

@ -898,4 +898,3 @@ case class SendResponseTimeout(peer: Peer, payload: NetworkPayload)
extends StreamDataMessageWrapper
case class SendToPeer(msg: NetworkMessage, peerOpt: Option[Peer])
extends StreamDataMessageWrapper