mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-26 21:42:48 +01:00
Remove PeerMessageReceiver
(#5182)
* Remove PeerMessageReceiver * Dont use QueueOfferResult, use akka.Done
This commit is contained in:
parent
c238191209
commit
b13e0565af
6 changed files with 52 additions and 167 deletions
node/src/main/scala/org/bitcoins/node
testkit/src/main/scala/org/bitcoins/testkit/node
|
@ -2,6 +2,7 @@ package org.bitcoins.node
|
|||
|
||||
import org.bitcoins.core.api.node.Peer
|
||||
import org.bitcoins.core.p2p.{
|
||||
ControlPayload,
|
||||
DataPayload,
|
||||
ExpectsResponse,
|
||||
NetworkMessage,
|
||||
|
@ -15,6 +16,9 @@ object NodeStreamMessage {
|
|||
case class DataMessageWrapper(payload: DataPayload, peer: Peer)
|
||||
extends NodeStreamMessage
|
||||
|
||||
case class ControlMessageWrapper(payload: ControlPayload, peer: Peer)
|
||||
extends NodeStreamMessage
|
||||
|
||||
case class HeaderTimeoutWrapper(peer: Peer) extends NodeStreamMessage
|
||||
|
||||
case class DisconnectedPeer(peer: Peer, forceReconnect: Boolean)
|
||||
|
|
|
@ -27,17 +27,12 @@ sealed trait PeerData {
|
|||
|
||||
def peerMessageSenderApi: PeerMessageSenderApi
|
||||
|
||||
private val initPeerMessageRecv = PeerMessageReceiver(controlMessageHandler =
|
||||
controlMessageHandler,
|
||||
queue = queue,
|
||||
peer = peer)
|
||||
|
||||
def stop(): Future[Unit] = {
|
||||
peerMessageSender.disconnect()
|
||||
}
|
||||
|
||||
val peerMessageSender: PeerMessageSender = {
|
||||
PeerMessageSender(peer, initPeerMessageRecv, peerMessageSenderApi)
|
||||
PeerMessageSender(peer, queue, peerMessageSenderApi)
|
||||
}
|
||||
|
||||
private[this] var _serviceIdentifier: Option[ServiceIdentifier] = None
|
||||
|
|
|
@ -804,6 +804,11 @@ case class PeerManager(
|
|||
r
|
||||
}
|
||||
}
|
||||
case (dmh, ControlMessageWrapper(payload, peer)) =>
|
||||
val controlMessageHandler = ControlMessageHandler(this)
|
||||
controlMessageHandler.handleControlPayload(payload, peer).map { _ =>
|
||||
dmh
|
||||
}
|
||||
case (dmh, HeaderTimeoutWrapper(peer)) =>
|
||||
logger.debug(s"Processing timeout header for $peer")
|
||||
for {
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import akka.actor.{ActorSystem, Cancellable}
|
||||
import akka.stream.QueueOfferResult
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import org.bitcoins.core.api.node.{NodeType, Peer}
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.{NodeStreamMessage, P2PLogger}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
/** Responsible for receiving messages from a peer on the
|
||||
* p2p network. This is called by [[org.bitcoins.rpc.client.common.Client Client]] when doing the p2p
|
||||
* handshake and during the [[PeerMessageReceiverState.Normal Normal]]
|
||||
* operations. This is the entry point for handling all received
|
||||
* [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]]
|
||||
*/
|
||||
case class PeerMessageReceiver(
|
||||
controlMessageHandler: ControlMessageHandler,
|
||||
queue: SourceQueueWithComplete[NodeStreamMessage],
|
||||
peer: Peer
|
||||
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig)
|
||||
extends P2PLogger {
|
||||
import system.dispatcher
|
||||
|
||||
require(nodeAppConfig.nodeType != NodeType.BitcoindBackend,
|
||||
"Bitcoind should handle the P2P interactions")
|
||||
|
||||
def handleNetworkMessageReceived(
|
||||
networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Future[
|
||||
PeerMessageReceiver] = {
|
||||
|
||||
val peer = networkMsgRecv.peer
|
||||
|
||||
logger.debug(
|
||||
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${peer}")
|
||||
|
||||
networkMsgRecv.msg.payload match {
|
||||
case controlPayload: ControlPayload =>
|
||||
handleControlPayload(payload = controlPayload)
|
||||
.map(_ => this)
|
||||
case dataPayload: DataPayload =>
|
||||
handleDataPayload(payload = dataPayload)
|
||||
.map(_ => this)
|
||||
}
|
||||
}
|
||||
|
||||
/** Handles a [[DataPayload]] message. It checks if the sender is the parent
|
||||
* actor, it sends it to our peer on the network. If the sender was the
|
||||
* peer on the network, forward to the actor that spawned our actor
|
||||
*
|
||||
* @param payload
|
||||
* @param sender
|
||||
*/
|
||||
private def handleDataPayload(
|
||||
payload: DataPayload): Future[QueueOfferResult] = {
|
||||
//else it means we are receiving this data payload from a peer,
|
||||
//we need to handle it
|
||||
val wrapper = NodeStreamMessage.DataMessageWrapper(payload, peer)
|
||||
|
||||
queue.offer(wrapper)
|
||||
}
|
||||
|
||||
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
|
||||
*
|
||||
* @param payload the payload we need to do something with
|
||||
* @param sender the [[PeerMessageSender]] we can use to initialize an subsequent messages that need to be sent
|
||||
* @return the requests with the request removed for which the @payload is responding too
|
||||
*/
|
||||
private def handleControlPayload(payload: ControlPayload): Future[Unit] = {
|
||||
controlMessageHandler
|
||||
.handleControlPayload(payload, peer)
|
||||
}
|
||||
|
||||
/** This method is called when we have received
|
||||
* a [[akka.io.Tcp.Connected]] message from our peer
|
||||
* This means we have opened a Tcp connection,
|
||||
* but have NOT started the handshake
|
||||
* This method will initiate the handshake
|
||||
*/
|
||||
protected[networking] def connect(peer: Peer)(implicit
|
||||
system: ActorSystem,
|
||||
nodeAppConfig: NodeAppConfig): Cancellable = {
|
||||
import system.dispatcher
|
||||
val initializationTimeoutCancellable =
|
||||
system.scheduler.scheduleOnce(nodeAppConfig.initializationTimeout) {
|
||||
val offerF =
|
||||
queue.offer(NodeStreamMessage.InitializationTimeout(peer))
|
||||
offerF.failed.foreach(err =>
|
||||
logger.error(s"Failed to offer initialize timeout for peer=$peer",
|
||||
err))
|
||||
}
|
||||
|
||||
initializationTimeoutCancellable
|
||||
}
|
||||
|
||||
protected[networking] def disconnect(peer: Peer)(implicit
|
||||
system: ActorSystem): Future[PeerMessageReceiver] = {
|
||||
import system.dispatcher
|
||||
logger.debug(s"Disconnecting peer=$peer with internalstate=${this}")
|
||||
val disconnectedPeer = NodeStreamMessage.DisconnectedPeer(peer, false)
|
||||
for {
|
||||
_ <- queue.offer(disconnectedPeer)
|
||||
} yield this
|
||||
}
|
||||
}
|
||||
|
||||
object PeerMessageReceiver {
|
||||
|
||||
sealed abstract class PeerMessageReceiverMsg
|
||||
|
||||
case class NetworkMessageReceived(msg: NetworkMessage, peer: Peer)
|
||||
extends PeerMessageReceiverMsg
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.{Done, NotUsed}
|
||||
import akka.actor.{ActorSystem, Cancellable}
|
||||
import akka.event.Logging
|
||||
import akka.io.Inet.SocketOption
|
||||
|
@ -13,9 +13,15 @@ import akka.stream.scaladsl.{
|
|||
RunnableGraph,
|
||||
Sink,
|
||||
Source,
|
||||
SourceQueueWithComplete,
|
||||
Tcp
|
||||
}
|
||||
import akka.stream.{Attributes, KillSwitches, UniqueKillSwitch}
|
||||
import akka.stream.{
|
||||
Attributes,
|
||||
KillSwitches,
|
||||
QueueOfferResult,
|
||||
UniqueKillSwitch
|
||||
}
|
||||
import akka.util.ByteString
|
||||
import org.bitcoins.chain.blockchain.ChainHandler
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
|
@ -24,24 +30,23 @@ import org.bitcoins.core.number.Int32
|
|||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.core.util.{FutureUtil, NetworkUtil}
|
||||
import org.bitcoins.node.NodeStreamMessage.DisconnectedPeer
|
||||
import org.bitcoins.node.P2PLogger
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.constant.NodeConstants
|
||||
import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived
|
||||
import org.bitcoins.node.networking.peer.PeerMessageSender.ConnectionGraph
|
||||
import org.bitcoins.node.util.PeerMessageSenderApi
|
||||
import org.bitcoins.node.{NodeStreamMessage, P2PLogger}
|
||||
import org.bitcoins.tor.Socks5Connection
|
||||
import scodec.bits.ByteVector
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import scala.concurrent.{Future, Promise}
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
import scala.concurrent.{Future, Promise}
|
||||
|
||||
case class PeerMessageSender(
|
||||
peer: Peer,
|
||||
initPeerMessageRecv: PeerMessageReceiver,
|
||||
queue: SourceQueueWithComplete[NodeStreamMessage],
|
||||
peerMessageSenderApi: PeerMessageSenderApi)(implicit
|
||||
nodeAppConfig: NodeAppConfig,
|
||||
chainAppConfig: ChainAppConfig,
|
||||
|
@ -166,9 +171,8 @@ case class PeerMessageSender(
|
|||
private def connectionGraph(
|
||||
handleNetworkMsgSink: Sink[
|
||||
Vector[NetworkMessage],
|
||||
Future[PeerMessageReceiver]]): RunnableGraph[(
|
||||
(Future[Tcp.OutgoingConnection], UniqueKillSwitch),
|
||||
Future[PeerMessageReceiver])] = {
|
||||
Future[Done]]): RunnableGraph[
|
||||
((Future[Tcp.OutgoingConnection], UniqueKillSwitch), Future[Done])] = {
|
||||
val result = mergeHubSource
|
||||
.viaMat(connectionFlow)(Keep.right)
|
||||
.toMat(handleNetworkMsgSink)(Keep.both)
|
||||
|
@ -176,21 +180,26 @@ case class PeerMessageSender(
|
|||
result
|
||||
}
|
||||
|
||||
private def buildConnectionGraph(): RunnableGraph[(
|
||||
(Future[Tcp.OutgoingConnection], UniqueKillSwitch),
|
||||
Future[PeerMessageReceiver])] = {
|
||||
private def buildConnectionGraph(): RunnableGraph[
|
||||
((Future[Tcp.OutgoingConnection], UniqueKillSwitch), Future[Done])] = {
|
||||
|
||||
val handleNetworkMsgSink: Sink[Vector[NetworkMessage], Future[Done]] = {
|
||||
|
||||
val handleNetworkMsgSink: Sink[
|
||||
Vector[NetworkMessage],
|
||||
Future[PeerMessageReceiver]] = {
|
||||
Flow[Vector[NetworkMessage]]
|
||||
.foldAsync(initPeerMessageRecv) { case (peerMsgRecv, msgs) =>
|
||||
FutureUtil.foldLeftAsync(peerMsgRecv, msgs) { case (p, msg) =>
|
||||
p.handleNetworkMessageReceived(networkMsgRecv =
|
||||
NetworkMessageReceived(msg, peer))
|
||||
.mapAsync(1) { case msgs =>
|
||||
FutureUtil.foldLeftAsync[QueueOfferResult, NetworkMessage](
|
||||
QueueOfferResult.Enqueued,
|
||||
msgs) { case (_, msg) =>
|
||||
val wrapper = msg.payload match {
|
||||
case c: ControlPayload =>
|
||||
NodeStreamMessage.ControlMessageWrapper(c, peer)
|
||||
case d: DataPayload =>
|
||||
NodeStreamMessage.DataMessageWrapper(d, peer)
|
||||
}
|
||||
queue.offer(wrapper)
|
||||
}
|
||||
}
|
||||
.toMat(Sink.last)(Keep.right)
|
||||
.toMat(Sink.ignore)(Keep.right)
|
||||
}
|
||||
|
||||
connectionGraph(handleNetworkMsgSink)
|
||||
|
@ -213,7 +222,14 @@ case class PeerMessageSender(
|
|||
buildConnectionGraph().run()
|
||||
}
|
||||
|
||||
val initializationCancellable = initPeerMessageRecv.connect(peer)
|
||||
val initializationCancellable =
|
||||
system.scheduler.scheduleOnce(nodeAppConfig.initializationTimeout) {
|
||||
val offerF =
|
||||
queue.offer(NodeStreamMessage.InitializationTimeout(peer))
|
||||
offerF.failed.foreach(err =>
|
||||
logger.error(s"Failed to offer initialize timeout for peer=$peer",
|
||||
err))
|
||||
}
|
||||
|
||||
outgoingConnectionF.onComplete {
|
||||
case scala.util.Success(o) =>
|
||||
|
@ -252,13 +268,14 @@ case class PeerMessageSender(
|
|||
|
||||
val _ = graph.streamDoneF
|
||||
.onComplete {
|
||||
case scala.util.Success(p) =>
|
||||
p.disconnect(peer)
|
||||
case scala.util.Success(_) =>
|
||||
val disconnectedPeer = DisconnectedPeer(peer, false)
|
||||
queue.offer(disconnectedPeer)
|
||||
case scala.util.Failure(err) =>
|
||||
logger.info(
|
||||
s"Connection with peer=$peer failed with err=${err.getMessage}")
|
||||
val disconnectedPeer = DisconnectedPeer(peer, false)
|
||||
initPeerMessageRecv.queue.offer(disconnectedPeer)
|
||||
queue.offer(disconnectedPeer)
|
||||
}
|
||||
|
||||
resultF.map(_ => ())
|
||||
|
@ -397,7 +414,7 @@ object PeerMessageSender {
|
|||
case class ConnectionGraph(
|
||||
mergeHubSink: Sink[ByteString, NotUsed],
|
||||
connectionF: Future[Tcp.OutgoingConnection],
|
||||
streamDoneF: Future[PeerMessageReceiver],
|
||||
streamDoneF: Future[Done],
|
||||
killswitch: UniqueKillSwitch)
|
||||
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import org.bitcoins.chain.config.ChainAppConfig
|
|||
import org.bitcoins.core.api.node.{NodeType, Peer}
|
||||
import org.bitcoins.node._
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.networking.peer._
|
||||
import org.bitcoins.rpc.client.common.BitcoindVersion.V22
|
||||
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
|
||||
import org.bitcoins.rpc.client.v22.BitcoindV22RpcClient
|
||||
|
@ -336,26 +335,6 @@ object NodeUnitTest extends P2PLogger {
|
|||
destroyedF
|
||||
}
|
||||
|
||||
def buildPeerMessageReceiver(
|
||||
peer: Peer,
|
||||
walletCreationTimeOpt: Option[Instant])(implicit
|
||||
nodeAppConfig: NodeAppConfig,
|
||||
chainAppConfig: ChainAppConfig,
|
||||
system: ActorSystem): Future[PeerMessageReceiver] = {
|
||||
import system.dispatcher
|
||||
val nodeF = buildNode(peer, walletCreationTimeOpt)
|
||||
for {
|
||||
node <- nodeF
|
||||
controlMessageHandler =
|
||||
ControlMessageHandler(node.peerManager)(system.dispatcher,
|
||||
nodeAppConfig)
|
||||
receiver =
|
||||
PeerMessageReceiver(controlMessageHandler = controlMessageHandler,
|
||||
queue = node.peerManager.dataMessageQueueOpt.get,
|
||||
peer = peer)
|
||||
} yield receiver
|
||||
}
|
||||
|
||||
def createPeer(bitcoind: BitcoindRpcClient)(implicit
|
||||
executionContext: ExecutionContext): Future[Peer] = {
|
||||
NodeTestUtil.getBitcoindPeer(bitcoind)
|
||||
|
|
Loading…
Add table
Reference in a new issue