2023 04 17 refactor peer message receiver (#5045)

* WIP: Refactor PeerMessageReceiver methods into PeerMessageReceiverState

* Remove state helper methods from PeerMessageReceiver

* WIP: Remove PeerMessageReceiverState from PeerMessageReceiver

* Get things compiling and tests passing

* Remove currentPeerMessageHandlerReceiver

* Refactor PeerMessageReceiverTest to not use bitcoind
This commit is contained in:
Chris Stewart 2023-04-18 10:25:24 -05:00 committed by GitHub
parent 25cdf9d880
commit 51429a7d68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 506 additions and 484 deletions

View file

@ -5,7 +5,10 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.ConnectCommand import org.bitcoins.node.networking.P2PClient.ConnectCommand
import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
}
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.fixtures.BitcoinSAppConfigBitcoinFixtureStarted import org.bitcoins.testkit.fixtures.BitcoinSAppConfigBitcoinFixtureStarted
@ -97,16 +100,22 @@ class P2PClientActorTest
val peerMessageReceiverF = val peerMessageReceiverF =
for { for {
node <- NodeUnitTest.buildNode(peer, None) node <- NodeUnitTest.buildNode(peer, None)
} yield PeerMessageReceiver.preConnection(peer, node) } yield PeerMessageReceiver(node, peer)
val clientActorF: Future[TestActorRef[P2PClientActor]] = val clientActorF: Future[TestActorRef[P2PClientActor]] =
peerMessageReceiverF.map { peerMsgRecv => peerMessageReceiverF.map { peerMsgRecv =>
TestActorRef( TestActorRef(
P2PClient.props(peer = peer, P2PClient.props(
peerMsgHandlerReceiver = peerMsgRecv, peer = peer,
onReconnect = (_: Peer) => Future.unit, peerMsgHandlerReceiver = peerMsgRecv,
onStop = (_: Peer) => Future.unit, peerMsgRecvState = PeerMessageReceiverState.fresh(),
maxReconnectionTries = 16), onReconnect = (_: Peer) => Future.unit,
onStop = (_: Peer) => Future.unit,
onInitializationTimeout = (_: Peer) => Future.unit,
onQueryTimeout = (_, _) => Future.unit,
sendResponseTimeout = (_, _) => Future.unit,
maxReconnectionTries = 16
),
probe.ref probe.ref
) )
} }

View file

@ -7,128 +7,92 @@ import org.bitcoins.core.p2p.{InetAddress, VerAckMessage, VersionMessage}
import org.bitcoins.node.constant.NodeConstants import org.bitcoins.node.constant.NodeConstants
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.P2PClient
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.util.BitcoinSAsyncTest
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.NodeTestWithCachedBitcoindNewest
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.util.TorUtil
import org.scalatest.{FutureOutcome, Outcome}
import java.net.InetSocketAddress import java.net.InetSocketAddress
import scala.concurrent.{Future, Promise} import scala.concurrent.{Future, Promise}
class PeerMessageReceiverTest extends NodeTestWithCachedBitcoindNewest { class PeerMessageReceiverTest extends BitcoinSAsyncTest {
/** Wallet config with data directory set to user temp directory */
override protected def getFreshConfig: BitcoinSAppConfig =
BitcoinSTestAppConfig.getMultiPeerNeutrinoWithEmbeddedDbTestConfig(
pgUrl,
Vector.empty)
override type FixtureParam = NeutrinoNodeConnectedWithBitcoind
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
val torClientF = if (TorUtil.torEnabled) torF else Future.unit
val outcomeF: Future[Outcome] = for {
_ <- torClientF
bitcoind <- cachedBitcoindWithFundsF
outcome = withNeutrinoNodeConnectedToBitcoindCached(test, bitcoind)(
system,
getFreshConfig)
f <- outcome.toFuture
} yield f
new FutureOutcome(outcomeF)
}
behavior of "PeerMessageReceiverTest" behavior of "PeerMessageReceiverTest"
it must "change a peer message receiver to be disconnected" in { it must "change a peer message receiver to be disconnected" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind => val socket = InetSocketAddress.createUnresolved("google.com", 12345)
val node = nodeConnectedWithBitcoind.node val peer = Peer(socket, None, None)
val socket = InetSocketAddress.createUnresolved("google.com", 12345) val client = P2PClient(ActorRef.noSender, peer)
val peer = Peer(socket, None, None) val clientP = Promise[P2PClient]()
val client = P2PClient(ActorRef.noSender, peer) clientP.success(client)
val clientP = Promise[P2PClient]()
clientP.success(client)
val versionMsgP = Promise[VersionMessage]() val versionMsgP = Promise[VersionMessage]()
val localhost = java.net.InetAddress.getLocalHost val localhost = java.net.InetAddress.getLocalHost
val versionMsg = VersionMessage(RegTest, val versionMsg = VersionMessage(RegTest,
NodeConstants.userAgent, NodeConstants.userAgent,
Int32.one, Int32.one,
InetAddress(localhost.getAddress), InetAddress(localhost.getAddress),
InetAddress(localhost.getAddress), InetAddress(localhost.getAddress),
false) false)
versionMsgP.success(versionMsg) versionMsgP.success(versionMsg)
val verackMsgP = Promise[VerAckMessage.type]() val verackMsgP = Promise[VerAckMessage.type]()
verackMsgP.success(VerAckMessage) verackMsgP.success(VerAckMessage)
val normal = PeerMessageReceiverState.Normal(clientConnectP = clientP, val normal = PeerMessageReceiverState.Normal(clientConnectP = clientP,
clientDisconnectP = clientDisconnectP =
Promise[Unit](), Promise[Unit](),
versionMsgP = versionMsgP, versionMsgP = versionMsgP,
verackMsgP = verackMsgP) verackMsgP = verackMsgP)
val peerMsgReceiver = val newMsgReceiverStateF = normal.disconnect(peer, (_, _) => Future.unit)
PeerMessageReceiver(normal, node, peer)(system, node.nodeAppConfig)
val newMsgReceiverF = peerMsgReceiver.disconnect() newMsgReceiverStateF.map { newMsgReceiverState =>
assert(
newMsgReceiverF.map { newMsgReceiver => newMsgReceiverState
assert( .isInstanceOf[PeerMessageReceiverState.Disconnected])
newMsgReceiver.state assert(newMsgReceiverState.isDisconnected)
.isInstanceOf[PeerMessageReceiverState.Disconnected]) }
assert(newMsgReceiver.isDisconnected)
}
} }
it must "change a peer message receiver to be initializing disconnect" in { it must "change a peer message receiver to be initializing disconnect" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind => val socket = InetSocketAddress.createUnresolved("google.com", 12345)
val node = nodeConnectedWithBitcoind.node val peer = Peer(socket, None, None)
val socket = InetSocketAddress.createUnresolved("google.com", 12345) val client = P2PClient(ActorRef.noSender, peer)
val peer = Peer(socket, None, None) val clientP = Promise[P2PClient]()
val client = P2PClient(ActorRef.noSender, peer) clientP.success(client)
val clientP = Promise[P2PClient]()
clientP.success(client)
val versionMsgP = Promise[VersionMessage]() val versionMsgP = Promise[VersionMessage]()
val localhost = java.net.InetAddress.getLocalHost val localhost = java.net.InetAddress.getLocalHost
val versionMsg = VersionMessage(RegTest, val versionMsg = VersionMessage(RegTest,
NodeConstants.userAgent, NodeConstants.userAgent,
Int32.one, Int32.one,
InetAddress(localhost.getAddress), InetAddress(localhost.getAddress),
InetAddress(localhost.getAddress), InetAddress(localhost.getAddress),
false) false)
versionMsgP.success(versionMsg) versionMsgP.success(versionMsg)
val verackMsgP = Promise[VerAckMessage.type]() val verackMsgP = Promise[VerAckMessage.type]()
verackMsgP.success(VerAckMessage) verackMsgP.success(VerAckMessage)
val normal = PeerMessageReceiverState.Normal(clientConnectP = clientP, val normal = PeerMessageReceiverState.Normal(clientConnectP = clientP,
clientDisconnectP = clientDisconnectP =
Promise[Unit](), Promise[Unit](),
versionMsgP = versionMsgP, versionMsgP = versionMsgP,
verackMsgP = verackMsgP) verackMsgP = verackMsgP)
val peerMsgReceiver = val newMsgReceiverState = normal.initializeDisconnect(peer)
PeerMessageReceiver(normal, node, peer)(system, node.nodeAppConfig)
val newMsgReceiver = peerMsgReceiver.initializeDisconnect() assert(
newMsgReceiverState
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnect])
assert(!newMsgReceiverState.isDisconnected)
assert( newMsgReceiverState.disconnect(peer, (_, _) => Future.unit).map {
newMsgReceiver.state disconnectRecv =>
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnect])
assert(!newMsgReceiver.isDisconnected)
newMsgReceiver.disconnect().map { disconnectRecv =>
assert( assert(
disconnectRecv.state disconnectRecv
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone]) .isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone])
assert(disconnectRecv.isDisconnected) assert(disconnectRecv.isDisconnected)
assert(disconnectRecv.state.clientDisconnectP.isCompleted) assert(disconnectRecv.clientDisconnectP.isCompleted)
} }
} }
} }

View file

@ -1,12 +1,14 @@
package org.bitcoins.node package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.p2p.ServiceIdentifier import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.{ import org.bitcoins.node.networking.peer.{
PeerMessageReceiver, PeerMessageReceiver,
PeerMessageReceiverState,
PeerMessageSender PeerMessageSender
} }
@ -19,7 +21,10 @@ case class PeerData(
peer: Peer, peer: Peer,
node: Node, node: Node,
supervisor: ActorRef supervisor: ActorRef
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) { )(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig) {
import system.dispatcher import system.dispatcher
lazy val peerMessageSender: Future[PeerMessageSender] = { lazy val peerMessageSender: Future[PeerMessageSender] = {
@ -28,12 +33,16 @@ case class PeerData(
private lazy val client: Future[P2PClient] = { private lazy val client: Future[P2PClient] = {
val peerMessageReceiver = val peerMessageReceiver =
PeerMessageReceiver.newReceiver(node = node, peer = peer) PeerMessageReceiver(node, peer)
P2PClient( P2PClient(
peer = peer, peer = peer,
peerMessageReceiver = peerMessageReceiver, peerMessageReceiver = peerMessageReceiver,
peerMsgRecvState = PeerMessageReceiverState.fresh(),
onReconnect = node.peerManager.onReconnect, onReconnect = node.peerManager.onReconnect,
onStop = node.peerManager.onP2PClientStopped, onStop = node.peerManager.onP2PClientStopped,
onInitializationTimeout = node.peerManager.onInitializationTimeout,
node.peerManager.onQueryTimeout,
node.peerManager.sendResponseTimeout,
maxReconnectionTries = 4, maxReconnectionTries = 4,
supervisor = supervisor supervisor = supervisor
) )

View file

@ -3,6 +3,7 @@ package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem, Cancellable} import akka.actor.{ActorRef, ActorSystem, Cancellable}
import monix.execution.atomic.AtomicBoolean import monix.execution.atomic.AtomicBoolean
import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.p2p.ServiceIdentifier import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync} import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
@ -22,7 +23,8 @@ case class PeerFinder(
supervisor: ActorRef)(implicit supervisor: ActorRef)(implicit
ec: ExecutionContext, ec: ExecutionContext,
system: ActorSystem, system: ActorSystem,
nodeAppConfig: NodeAppConfig) nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
extends StartStopAsync[PeerFinder] extends StartStopAsync[PeerFinder]
with P2PLogger { with P2PLogger {

View file

@ -4,20 +4,21 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import akka.stream.OverflowStrategy import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.node.NodeType import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p._ import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync} import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb} import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer._ import org.bitcoins.node.networking.peer._
import org.bitcoins.node.networking.{P2PClientSupervisor} import org.bitcoins.node.networking.P2PClientSupervisor
import org.bitcoins.node.util.BitcoinSNodeUtil import org.bitcoins.node.util.BitcoinSNodeUtil
import scodec.bits.ByteVector import scodec.bits.ByteVector
import java.net.InetAddress import java.net.InetAddress
import java.time.Duration import java.time.Duration
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.{DurationInt} import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Random import scala.util.Random
@ -26,7 +27,8 @@ case class PeerManager(
node: NeutrinoNode)(implicit node: NeutrinoNode)(implicit
ec: ExecutionContext, ec: ExecutionContext,
system: ActorSystem, system: ActorSystem,
nodeAppConfig: NodeAppConfig) nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
extends StartStopAsync[PeerManager] extends StartStopAsync[PeerManager]
with P2PLogger { with P2PLogger {

View file

@ -6,6 +6,7 @@ import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp} import akka.io.{IO, Tcp}
import akka.pattern.ask import akka.pattern.ask
import akka.util.{ByteString, CompactByteString, Timeout} import akka.util.{ByteString, CompactByteString, Timeout}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.config.NetworkParameters import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.p2p.{ import org.bitcoins.core.p2p.{
ExpectsResponse, ExpectsResponse,
@ -13,7 +14,7 @@ import org.bitcoins.core.p2p.{
NetworkMessage, NetworkMessage,
NetworkPayload NetworkPayload
} }
import org.bitcoins.core.util.{NetworkUtil} import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.{ import org.bitcoins.node.networking.P2PClient.{
@ -68,17 +69,21 @@ import scala.util._
*/ */
case class P2PClientActor( case class P2PClientActor(
peer: Peer, peer: Peer,
initPeerMsgHandlerReceiver: PeerMessageReceiver, peerMsgHandlerReceiver: PeerMessageReceiver,
initPeerMsgRecvState: PeerMessageReceiverState,
onReconnect: Peer => Future[Unit], onReconnect: Peer => Future[Unit],
onStop: Peer => Future[Unit], onStop: Peer => Future[Unit],
onInitializationTimeout: Peer => Future[Unit],
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit],
sendResponseTimeout: (Peer, NetworkPayload) => Future[Unit],
maxReconnectionTries: Int maxReconnectionTries: Int
)(implicit config: NodeAppConfig) )(implicit nodeAppConfig: NodeAppConfig, chainAppConfig: ChainAppConfig)
extends Actor extends Actor
with P2PLogger { with P2PLogger {
import context.dispatcher import context.dispatcher
private var currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver private var currentPeerMsgRecvState = initPeerMsgRecvState
private var reconnectHandlerOpt: Option[Peer => Future[Unit]] = None private var reconnectHandlerOpt: Option[Peer => Future[Unit]] = None
@ -92,7 +97,7 @@ case class P2PClientActor(
/** The parameters for the network we are connected to /** The parameters for the network we are connected to
*/ */
private val network: NetworkParameters = config.network private val network: NetworkParameters = nodeAppConfig.network
private val timeout = 60.seconds private val timeout = 60.seconds
@ -120,9 +125,12 @@ case class P2PClientActor(
} }
sendNetworkMessage(message, peerConnection) sendNetworkMessage(message, peerConnection)
case ResponseTimeout(msg) => case ResponseTimeout(msg) =>
currentPeerMsgHandlerRecv = currentPeerMsgRecvState = Await.result(
Await.result(currentPeerMsgHandlerRecv.onResponseTimeout(msg), currentPeerMsgRecvState.onResponseTimeout(msg,
timeout) peer,
onQueryTimeout =
onQueryTimeout),
timeout)
case payload: NetworkPayload => case payload: NetworkPayload =>
val networkMsg = NetworkMessage(network, payload) val networkMsg = NetworkMessage(network, payload)
self.forward(networkMsg) self.forward(networkMsg)
@ -145,20 +153,20 @@ case class P2PClientActor(
reconnect() reconnect()
case networkMessageReceived: NetworkMessageReceived => case networkMessageReceived: NetworkMessageReceived =>
val newMsgReceiverF = val newMsgReceiverStateF =
handleReceivedMsgFn(currentPeerMsgHandlerRecv, networkMessageReceived) handleReceivedMsgFn(networkMessageReceived)
val newMsgReceiver = val newMsgReceiverState =
try { try {
Await.result(newMsgReceiverF, timeout) Await.result(newMsgReceiverStateF, timeout)
} catch { } catch {
case scala.util.control.NonFatal(err) => case scala.util.control.NonFatal(err) =>
logger.error( logger.error(
s"Failed to process message in time state=${currentPeerMsgHandlerRecv.state}, msgs=${networkMessageReceived.msg.header.commandName}", s"Failed to process message in time state=${currentPeerMsgRecvState}, msgs=${networkMessageReceived.msg.header.commandName}",
err) err)
throw err throw err
} }
currentPeerMsgHandlerRecv = newMsgReceiver currentPeerMsgRecvState = newMsgReceiverState
if (currentPeerMsgHandlerRecv.isInitialized) { if (currentPeerMsgRecvState.isInitialized) {
curReconnectionTry = 0 curReconnectionTry = 0
reconnectHandlerOpt.foreach(_.apply(peer)) reconnectHandlerOpt.foreach(_.apply(peer))
reconnectHandlerOpt = None reconnectHandlerOpt = None
@ -325,18 +333,18 @@ case class P2PClientActor(
(peer.socket, None) (peer.socket, None)
} }
manager ! Tcp.Connect(peerOrProxyAddress, manager ! Tcp.Connect(peerOrProxyAddress,
timeout = Some(config.connectionTimeout), timeout = Some(nodeAppConfig.connectionTimeout),
options = KeepAlive(true) :: Nil, options = KeepAlive(true) :: Nil,
pullMode = true) pullMode = true)
context become connecting(proxyParams) context become connecting(proxyParams)
} }
private def reconnect(): Unit = { private def reconnect(): Unit = {
currentPeerMsgHandlerRecv.state match { currentPeerMsgRecvState match {
case _: PeerMessageReceiverState.InitializedDisconnect | case _: PeerMessageReceiverState.InitializedDisconnect |
_: PeerMessageReceiverState.InitializedDisconnectDone => _: PeerMessageReceiverState.InitializedDisconnectDone =>
logger.debug( logger.debug(
s"Ignoring reconnection attempts as we initialized disconnect from peer=$peer, state=${currentPeerMsgHandlerRecv.state}") s"Ignoring reconnection attempts as we initialized disconnect from peer=$peer, state=${currentPeerMsgRecvState}")
context.stop(self) context.stop(self)
case bad: PeerMessageReceiverState.StoppedReconnect => case bad: PeerMessageReceiverState.StoppedReconnect =>
throw new RuntimeException(s"Tried to reconnect when in state $bad") throw new RuntimeException(s"Tried to reconnect when in state $bad")
@ -344,8 +352,10 @@ case class P2PClientActor(
_: Normal | _: Disconnected | _: Waiting) => _: Normal | _: Disconnected | _: Waiting) =>
state match { state match {
case wait: Waiting => case wait: Waiting =>
currentPeerMsgHandlerRecv = Await.result( currentPeerMsgRecvState = Await.result(
currentPeerMsgHandlerRecv.onResponseTimeout(wait.responseFor), currentPeerMsgRecvState.onResponseTimeout(wait.responseFor,
peer,
onQueryTimeout),
timeout) timeout)
case init: Initializing => case init: Initializing =>
init.initializationTimeoutCancellable.cancel() init.initializationTimeoutCancellable.cancel()
@ -353,8 +363,7 @@ case class P2PClientActor(
} }
logger.debug( logger.debug(
s"Attempting to reconnect to peer=$peer, tryCount=$reconnectionTry, previous state=${currentPeerMsgHandlerRecv.state}") s"Attempting to reconnect to peer=$peer, tryCount=$reconnectionTry, previous state=${currentPeerMsgRecvState}")
currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
if (reconnectionTry >= maxReconnectionTries) { if (reconnectionTry >= maxReconnectionTries) {
logger.debug("Exceeded maximum number of reconnection attempts") logger.debug("Exceeded maximum number of reconnection attempts")
@ -399,25 +408,32 @@ case class P2PClientActor(
//our bitcoin peer will send all messages to this actor. //our bitcoin peer will send all messages to this actor.
peerConnection ! Tcp.Register(self) peerConnection ! Tcp.Register(self)
peerConnection ! Tcp.ResumeReading peerConnection ! Tcp.ResumeReading
val client = P2PClient(self, peer)
currentPeerMsgHandlerRecv = currentPeerMsgRecvState =
currentPeerMsgHandlerRecv.connect(P2PClient(self, peer)) currentPeerMsgRecvState.connect(client, onInitializationTimeout)(
context.system,
nodeAppConfig,
chainAppConfig)
context.become(awaitNetworkRequest(peerConnection, unalignedBytes)) context.become(awaitNetworkRequest(peerConnection, unalignedBytes))
unalignedBytes unalignedBytes
case Tcp.ErrorClosed(cause) => case Tcp.ErrorClosed(cause) =>
logger.debug( logger.debug(
s"An error occurred in our connection with $peer, cause=$cause state=${currentPeerMsgHandlerRecv.state}") s"An error occurred in our connection with $peer, cause=$cause state=${currentPeerMsgRecvState}")
currentPeerMsgHandlerRecv = currentPeerMsgRecvState = Await.result(
Await.result(currentPeerMsgHandlerRecv.disconnect(), timeout) currentPeerMsgRecvState.disconnect(peer, onQueryTimeout)(
context.system),
timeout)
context.stop(self) context.stop(self)
unalignedBytes unalignedBytes
case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted | case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted |
Tcp.PeerClosed) => Tcp.PeerClosed) =>
logger.info( logger.info(
s"We've been disconnected by $peer command=${closeCmd} state=${currentPeerMsgHandlerRecv.state}") s"We've been disconnected by $peer command=${closeCmd} state=${currentPeerMsgRecvState}")
currentPeerMsgHandlerRecv = currentPeerMsgRecvState = Await.result(
Await.result(currentPeerMsgHandlerRecv.disconnect(), timeout) currentPeerMsgRecvState.disconnect(peer, onQueryTimeout)(
context.system),
timeout)
context.stop(self) context.stop(self)
unalignedBytes unalignedBytes
@ -463,33 +479,33 @@ case class P2PClientActor(
} }
} }
private val handleReceivedMsgFn: ( private val handleReceivedMsgFn: NetworkMessageReceived => Future[
PeerMessageReceiver, PeerMessageReceiverState] = { case msg: NetworkMessageReceived =>
NetworkMessageReceived) => Future[PeerMessageReceiver] = { val resultF = if (currentPeerMsgRecvState.isConnected) {
case (peerMsgRecv: PeerMessageReceiver, msg: NetworkMessageReceived) => currentPeerMsgRecvState match {
val resultF = if (peerMsgRecv.isConnected) { case _ @(_: Normal | _: Waiting | Preconnection | _: Initializing) =>
currentPeerMsgHandlerRecv.state match { peerMsgHandlerReceiver.handleNetworkMessageReceived(
case _ @(_: Normal | _: Waiting | Preconnection | _: Initializing) => msg,
peerMsgRecv.handleNetworkMessageReceived(msg) currentPeerMsgRecvState)
case _: Disconnected | _: InitializedDisconnectDone | case _: Disconnected | _: InitializedDisconnectDone |
_: InitializedDisconnect | _: StoppedReconnect => _: InitializedDisconnect | _: StoppedReconnect =>
logger.debug( logger.debug(
s"Ignoring ${msg.msg.payload.commandName} from $peer as in state=${currentPeerMsgHandlerRecv.state}") s"Ignoring ${msg.msg.payload.commandName} from $peer as in state=${currentPeerMsgRecvState}")
Future.successful(peerMsgRecv) Future.successful(currentPeerMsgRecvState)
}
} else {
Future.successful(peerMsgRecv)
} }
resultF } else {
Future.successful(currentPeerMsgRecvState)
}
resultF
} }
/** Returns the current state of our peer given the [[P2PClient.MetaMsg meta message]] /** Returns the current state of our peer given the [[P2PClient.MetaMsg meta message]]
*/ */
private def handleMetaMsg(metaMsg: P2PClient.MetaMsg): Boolean = { private def handleMetaMsg(metaMsg: P2PClient.MetaMsg): Boolean = {
metaMsg match { metaMsg match {
case P2PClient.IsConnected => currentPeerMsgHandlerRecv.isConnected case P2PClient.IsConnected => currentPeerMsgRecvState.isConnected
case P2PClient.IsInitialized => currentPeerMsgHandlerRecv.isInitialized case P2PClient.IsInitialized => currentPeerMsgRecvState.isInitialized
case P2PClient.IsDisconnected => currentPeerMsgHandlerRecv.isDisconnected case P2PClient.IsDisconnected => currentPeerMsgRecvState.isDisconnected
} }
} }
@ -522,8 +538,8 @@ case class P2PClientActor(
logger.info(s"Disconnecting from peer $peer") logger.info(s"Disconnecting from peer $peer")
context become ignoreNetworkMessages(Some(peerConnection), context become ignoreNetworkMessages(Some(peerConnection),
ByteVector.empty) ByteVector.empty)
currentPeerMsgHandlerRecv = currentPeerMsgRecvState =
currentPeerMsgHandlerRecv.initializeDisconnect() currentPeerMsgRecvState.initializeDisconnect(peer)
peerConnection ! Tcp.Close peerConnection ! Tcp.Close
case None => case None =>
logger.warn( logger.warn(
@ -535,13 +551,13 @@ case class P2PClientActor(
case Some(peerConnection) => case Some(peerConnection) =>
context become ignoreNetworkMessages(Some(peerConnection), context become ignoreNetworkMessages(Some(peerConnection),
ByteVector.empty) ByteVector.empty)
currentPeerMsgHandlerRecv = currentPeerMsgRecvState =
currentPeerMsgHandlerRecv.initializeDisconnect() currentPeerMsgRecvState.initializeDisconnect(peer)
peerConnection ! Tcp.Close peerConnection ! Tcp.Close
case None => case None =>
context become ignoreNetworkMessages(None, ByteVector.empty) context become ignoreNetworkMessages(None, ByteVector.empty)
currentPeerMsgHandlerRecv = currentPeerMsgRecvState =
currentPeerMsgHandlerRecv.stopReconnect() currentPeerMsgRecvState.stopReconnect(peer)
context.stop(self) context.stop(self)
} }
} }
@ -559,9 +575,15 @@ case class P2PClientActor(
msg.isInstanceOf[ExpectsResponse], msg.isInstanceOf[ExpectsResponse],
s"Tried to wait for response to message which is not a query, got=$msg") s"Tried to wait for response to message which is not a query, got=$msg")
logger.debug(s"Expecting response for ${msg.commandName} for $peer") logger.debug(s"Expecting response for ${msg.commandName} for $peer")
currentPeerMsgHandlerRecv.handleExpectResponse(msg).map { newReceiver => currentPeerMsgRecvState
currentPeerMsgHandlerRecv = newReceiver .handleExpectResponse(
} msg = msg,
peer = peer,
sendResponseTimeout = sendResponseTimeout,
onQueryTimeout = onQueryTimeout)(context.system, nodeAppConfig)
.map { newReceiverState =>
currentPeerMsgRecvState = newReceiverState
}
} }
} }
@ -639,33 +661,57 @@ object P2PClient extends P2PLogger {
def props( def props(
peer: Peer, peer: Peer,
peerMsgHandlerReceiver: PeerMessageReceiver, peerMsgHandlerReceiver: PeerMessageReceiver,
peerMsgRecvState: PeerMessageReceiverState,
onReconnect: Peer => Future[Unit], onReconnect: Peer => Future[Unit],
onStop: Peer => Future[Unit], onStop: Peer => Future[Unit],
onInitializationTimeout: Peer => Future[Unit],
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit],
sendResponseTimeout: (Peer, NetworkPayload) => Future[Unit],
maxReconnectionTries: Int)(implicit maxReconnectionTries: Int)(implicit
config: NodeAppConfig nodeAppConfig: NodeAppConfig,
): Props = chainAppConfig: ChainAppConfig
Props(classOf[P2PClientActor], ): Props = {
peer, Props(
peerMsgHandlerReceiver, classOf[P2PClientActor],
onReconnect, peer,
onStop, peerMsgHandlerReceiver,
maxReconnectionTries, peerMsgRecvState,
config) onReconnect,
onStop,
onInitializationTimeout,
onQueryTimeout,
sendResponseTimeout,
maxReconnectionTries,
nodeAppConfig,
chainAppConfig
)
}
def apply( def apply(
peer: Peer, peer: Peer,
peerMessageReceiver: PeerMessageReceiver, peerMessageReceiver: PeerMessageReceiver,
peerMsgRecvState: PeerMessageReceiverState,
onReconnect: Peer => Future[Unit], onReconnect: Peer => Future[Unit],
onStop: Peer => Future[Unit], onStop: Peer => Future[Unit],
onInitializationTimeout: Peer => Future[Unit],
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit],
sendResponseTimeout: (Peer, NetworkPayload) => Future[Unit],
maxReconnectionTries: Int = 16, maxReconnectionTries: Int = 16,
supervisor: ActorRef)(implicit supervisor: ActorRef)(implicit
config: NodeAppConfig, nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[P2PClient] = { system: ActorSystem): Future[P2PClient] = {
val clientProps = props(peer, val clientProps = props(
peerMessageReceiver, peer = peer,
onReconnect, peerMsgHandlerReceiver = peerMessageReceiver,
onStop, peerMsgRecvState = peerMsgRecvState,
maxReconnectionTries) onReconnect = onReconnect,
onStop = onStop,
onInitializationTimeout = onInitializationTimeout,
onQueryTimeout = onQueryTimeout,
sendResponseTimeout = sendResponseTimeout,
maxReconnectionTries = maxReconnectionTries
)
import system.dispatcher import system.dispatcher
implicit val timeout: Timeout = Timeout(10.second) implicit val timeout: Timeout = Timeout(10.second)

View file

@ -16,8 +16,7 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
payload: ControlPayload, payload: ControlPayload,
sender: PeerMessageSender, sender: PeerMessageSender,
peer: Peer, peer: Peer,
peerMessageReceiver: PeerMessageReceiver): Future[PeerMessageReceiver] = { state: PeerMessageReceiverState): Future[PeerMessageReceiverState] = {
val state = peerMessageReceiver.state
payload match { payload match {
case versionMsg: VersionMessage => case versionMsg: VersionMessage =>
@ -34,11 +33,9 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
case good: Initializing => case good: Initializing =>
val newState = good.withVersionMsg(versionMsg) val newState = good.withVersionMsg(versionMsg)
val newRecv = peerMessageReceiver.toState(newState)
node.peerManager.onVersionMessage(peer, versionMsg) node.peerManager.onVersionMessage(peer, versionMsg)
sender.sendVerackMessage().map(_ => newRecv) sender.sendVerackMessage().map(_ => newState)
} }
case VerAckMessage => case VerAckMessage =>
@ -52,33 +49,32 @@ case class ControlMessageHandler(node: Node)(implicit ec: ExecutionContext)
case good: Initializing => case good: Initializing =>
val newState = good.toNormal(VerAckMessage) val newState = good.toNormal(VerAckMessage)
val newRecv = peerMessageReceiver.toState(newState)
node.peerManager.onInitialization(peer).map(_ => newRecv) node.peerManager.onInitialization(peer).map(_ => newState)
} }
case ping: PingMessage => case ping: PingMessage =>
sender.sendPong(ping).map { _ => sender.sendPong(ping).map { _ =>
peerMessageReceiver state
} }
case SendHeadersMessage => case SendHeadersMessage =>
//we want peers to just send us headers //we want peers to just send us headers
//we don't want to have to request them manually //we don't want to have to request them manually
sender.sendHeadersMessage().map(_ => peerMessageReceiver) sender.sendHeadersMessage().map(_ => state)
case msg: GossipAddrMessage => case msg: GossipAddrMessage =>
handleGossipAddrMessage(msg) handleGossipAddrMessage(msg)
Future.successful(peerMessageReceiver) Future.successful(state)
case SendAddrV2Message => case SendAddrV2Message =>
sender.sendSendAddrV2Message().map(_ => peerMessageReceiver) sender.sendSendAddrV2Message().map(_ => state)
case _ @(_: FilterAddMessage | _: FilterLoadMessage | case _ @(_: FilterAddMessage | _: FilterLoadMessage |
FilterClearMessage) => FilterClearMessage) =>
Future.successful(peerMessageReceiver) Future.successful(state)
case _ @(GetAddrMessage | _: PongMessage) => case _ @(GetAddrMessage | _: PongMessage) =>
Future.successful(peerMessageReceiver) Future.successful(state)
case _: RejectMessage => case _: RejectMessage =>
Future.successful(peerMessageReceiver) Future.successful(state)
case _: FeeFilterMessage => case _: FeeFilterMessage =>
Future.successful(peerMessageReceiver) Future.successful(state)
} }
} }

View file

@ -19,7 +19,6 @@ import scala.concurrent.Future
*/ */
class PeerMessageReceiver( class PeerMessageReceiver(
node: Node, node: Node,
val state: PeerMessageReceiverState,
peer: Peer peer: Peer
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) )(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig)
extends P2PLogger { extends P2PLogger {
@ -28,155 +27,9 @@ class PeerMessageReceiver(
require(nodeAppConfig.nodeType != NodeType.BitcoindBackend, require(nodeAppConfig.nodeType != NodeType.BitcoindBackend,
"Bitcoind should handle the P2P interactions") "Bitcoind should handle the P2P interactions")
/** This method is called when we have received
* a [[akka.io.Tcp.Connected]] message from our peer
* This means we have opened a Tcp connection,
* but have NOT started the handshake
* This method will initiate the handshake
*/
protected[networking] def connect(client: P2PClient): PeerMessageReceiver = {
state match {
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
_: InitializedDisconnectDone | _: Disconnected | _: StoppedReconnect |
_: Waiting) =>
throw new RuntimeException(s"Cannot call connect when in state=${bad}")
case Preconnection =>
logger.debug(s"Connection established with peer=${peer}")
val initializationTimeoutCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.initializationTimeout) {
val timeoutF = onInitTimeout()
timeoutF.failed.foreach(err =>
logger.error(s"Failed to initialize timeout for peer=$peer", err))
}
val newState =
Preconnection.toInitializing(client, initializationTimeoutCancellable)
val peerMsgSender = PeerMessageSender(client)
peerMsgSender.sendVersionMessage(node.getDataMessageHandler.chainApi)
val newRecv = toState(newState)
newRecv
}
}
/** Initializes the disconnection from our peer on the network.
* This is different than [[disconnect()]] as that indicates the
* peer initialized a disconnection from us
*/
private[networking] def initializeDisconnect(): PeerMessageReceiver = {
logger.debug(s"Initializing disconnect from $peer")
state match {
case good @ (_: Disconnected) =>
//if its already disconnected, just say init disconnect done so it wont reconnect
logger.debug(s"Init disconnect called for already disconnected $peer")
val newState = InitializedDisconnectDone(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP)
new PeerMessageReceiver(node, newState, peer)
case bad @ (_: InitializedDisconnectDone | Preconnection |
_: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot initialize disconnect from peer=$peer when in state=$bad")
case _: InitializedDisconnect =>
logger.warn(
s"Already initialized disconnected from peer=$peer, this is a noop")
this
case state @ (_: Initializing | _: Normal) =>
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
toState(newState)
case state: Waiting =>
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
toState(newState)
}
}
def stopReconnect(): PeerMessageReceiver = {
state match {
case Preconnection =>
//when retry, state should be back to preconnection
val newState = StoppedReconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
val newRecv = toState(newState)
newRecv
case _: StoppedReconnect =>
logger.warn(
s"Already stopping reconnect from peer=$peer, this is a noop")
this
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
_: InitializedDisconnectDone | _: Disconnected | _: Waiting) =>
throw new RuntimeException(
s"Cannot stop reconnect from peer=$peer when in state=$bad")
}
}
protected[networking] def disconnect(): Future[PeerMessageReceiver] = {
logger.trace(s"Disconnecting with internalstate=${state}")
state match {
case bad @ (_: Disconnected | Preconnection |
_: InitializedDisconnectDone | _: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot disconnect from peer=${peer} when in state=${bad}")
case good: InitializedDisconnect =>
val newState = InitializedDisconnectDone(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP.success(()),
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP)
val newReceiver = new PeerMessageReceiver(node, newState, peer)
Future.successful(newReceiver)
case good @ (_: Initializing | _: Normal | _: Waiting) =>
val handleF: Future[Unit] = good match {
case wait: Waiting =>
onResponseTimeout(wait.responseFor).map(_ => ())
case wait: Initializing =>
wait.initializationTimeoutCancellable.cancel()
Future.unit
case _ => Future.unit
}
logger.debug(s"Disconnected bitcoin peer=${peer}")
val newState = Disconnected(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP.success(()),
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP
)
val newReceiver = new PeerMessageReceiver(node, newState, peer)
handleF.map(_ => newReceiver)
}
}
private[networking] def isConnected: Boolean = state.isConnected
private[networking] def isDisconnected: Boolean = state.isDisconnected
private[networking] def hasReceivedVersionMsg: Boolean =
state.hasReceivedVersionMsg.isCompleted
private[networking] def hasReceivedVerackMsg: Boolean =
state.hasReceivedVerackMsg.isCompleted
private[networking] def isInitialized: Boolean = state.isInitialized
def handleNetworkMessageReceived( def handleNetworkMessageReceived(
networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Future[ networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived,
PeerMessageReceiver] = { state: PeerMessageReceiverState): Future[PeerMessageReceiverState] = {
val client = networkMsgRecv.client val client = networkMsgRecv.client
@ -189,7 +42,7 @@ class PeerMessageReceiver(
val payload = networkMsgRecv.msg.payload val payload = networkMsgRecv.msg.payload
//todo: this works but doesn't seem to be the best place to do this //todo: this works but doesn't seem to be the best place to do this
val curReceiver: PeerMessageReceiver = { val curState: PeerMessageReceiverState = {
state match { state match {
case state: Waiting => case state: Waiting =>
val responseFor = state.responseFor.asInstanceOf[ExpectsResponse] val responseFor = state.responseFor.asInstanceOf[ExpectsResponse]
@ -202,13 +55,13 @@ class PeerMessageReceiver(
state.clientDisconnectP, state.clientDisconnectP,
state.versionMsgP, state.versionMsgP,
state.verackMsgP) state.verackMsgP)
toState(newState) newState
} else this } else state
case state: Initializing => case state: Initializing =>
if (payload == VerAckMessage) if (payload == VerAckMessage)
state.initializationTimeoutCancellable.cancel() state.initializationTimeoutCancellable.cancel()
this state
case _ => this case _ => state
} }
} }
@ -216,11 +69,10 @@ class PeerMessageReceiver(
case controlPayload: ControlPayload => case controlPayload: ControlPayload =>
handleControlPayload(payload = controlPayload, handleControlPayload(payload = controlPayload,
sender = peerMsgSender, sender = peerMsgSender,
curReceiver) curReceiverState = curState)
case dataPayload: DataPayload => case dataPayload: DataPayload =>
handleDataPayload(payload = dataPayload, handleDataPayload(payload = dataPayload, sender = peerMsgSender)
sender = peerMsgSender, .map(_ => curState)
curReceiver)
} }
} }
@ -233,13 +85,12 @@ class PeerMessageReceiver(
*/ */
private def handleDataPayload( private def handleDataPayload(
payload: DataPayload, payload: DataPayload,
sender: PeerMessageSender, sender: PeerMessageSender): Future[PeerMessageReceiver] = {
curReceiver: PeerMessageReceiver): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer, //else it means we are receiving this data payload from a peer,
//we need to handle it //we need to handle it
node.getDataMessageHandler node.getDataMessageHandler
.addToStream(payload, sender, peer) .addToStream(payload, sender, peer)
.map(_ => new PeerMessageReceiver(node, curReceiver.state, peer)) .map(_ => new PeerMessageReceiver(node, peer))
} }
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages /** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
@ -251,99 +102,10 @@ class PeerMessageReceiver(
private def handleControlPayload( private def handleControlPayload(
payload: ControlPayload, payload: ControlPayload,
sender: PeerMessageSender, sender: PeerMessageSender,
curReceiver: PeerMessageReceiver): Future[PeerMessageReceiver] = { curReceiverState: PeerMessageReceiverState): Future[
PeerMessageReceiverState] = {
node.controlMessageHandler node.controlMessageHandler
.handleControlPayload(payload, sender, peer, curReceiver) .handleControlPayload(payload, sender, peer, curReceiverState)
}
private def onInitTimeout(): Future[Unit] = {
logger.debug(s"Init timeout for peer $peer")
node.peerManager.onInitializationTimeout(peer)
}
def onResponseTimeout(
networkPayload: NetworkPayload): Future[PeerMessageReceiver] = {
assert(networkPayload.isInstanceOf[ExpectsResponse])
logger.info(
s"Handling response timeout for ${networkPayload.commandName} from $peer")
//isn't this redundant? No, on response timeout may be called when not cancel timeout
state match {
case wait: Waiting => wait.expectedResponseCancellable.cancel()
case _ =>
}
networkPayload match {
case payload: ExpectsResponse =>
logger.info(
s"Response for ${payload.commandName} from $peer timed out in state $state")
node.peerManager.onQueryTimeout(payload, peer).map { _ =>
state match {
case _: Waiting if state.isConnected && state.isInitialized =>
val newState =
Normal(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
toState(newState)
case _: PeerMessageReceiverState => this
}
}
case _ =>
logger.error(
s"onResponseTimeout called for ${networkPayload.commandName} which does not expect response")
Future.successful(this)
}
}
def handleExpectResponse(msg: NetworkPayload): Future[PeerMessageReceiver] = {
require(
msg.isInstanceOf[ExpectsResponse],
s"Cannot expect response for ${msg.commandName} from $peer as ${msg.commandName} does not expect a response.")
state match {
case good: Normal =>
logger.debug(s"Handling expected response for ${msg.commandName}")
val expectedResponseCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.queryWaitTime) {
val responseTimeoutF =
node.peerManager.sendResponseTimeout(peer, msg)
responseTimeoutF.failed.foreach(err =>
logger.error(
s"Failed to timeout waiting for response for peer=$peer",
err))
}
val newState = Waiting(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP,
responseFor = msg,
waitingSince = System.currentTimeMillis(),
expectedResponseCancellable = expectedResponseCancellable
)
Future.successful(toState(newState))
case state: Waiting =>
logger.debug(
s"Waiting for response to ${state.responseFor.commandName}. Ignoring next request for ${msg.commandName}")
Future.successful(this)
case bad @ (_: InitializedDisconnect | _: InitializedDisconnectDone |
_: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot expect response for ${msg.commandName} in state $bad")
case Preconnection | _: Initializing | _: Disconnected =>
//so we sent a message when things were good, but not we are back to connecting?
//can happen when can happen where once we initialize the remote peer immediately disconnects us
onResponseTimeout(msg)
}
}
/** Transitions our PeerMessageReceiver to a new state */
def toState(newState: PeerMessageReceiverState): PeerMessageReceiver = {
new PeerMessageReceiver(
node = node,
state = newState,
peer = peer
)
} }
} }
@ -360,31 +122,10 @@ object PeerMessageReceiver {
case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient) case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient)
extends PeerMessageReceiverMsg extends PeerMessageReceiverMsg
def apply(state: PeerMessageReceiverState, node: Node, peer: Peer)(implicit def apply(node: Node, peer: Peer)(implicit
system: ActorSystem, system: ActorSystem,
nodeAppConfig: NodeAppConfig nodeAppConfig: NodeAppConfig
): PeerMessageReceiver = { ): PeerMessageReceiver = {
new PeerMessageReceiver(node = node, state = state, peer = peer) new PeerMessageReceiver(node = node, peer = peer)
}
/** Creates a peer message receiver that is ready
* to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]]
* to connect to a peer on the network
*/
def preConnection(peer: Peer, node: Node)(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig
): PeerMessageReceiver = {
PeerMessageReceiver(node = node,
state = PeerMessageReceiverState.fresh(),
peer = peer)
}
def newReceiver(node: Node, peer: Peer)(implicit
nodeAppConfig: NodeAppConfig,
system: ActorSystem): PeerMessageReceiver = {
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
node = node,
peer = peer)
} }
} }

View file

@ -1,11 +1,30 @@
package org.bitcoins.node.networking.peer package org.bitcoins.node.networking.peer
import akka.actor.Cancellable import akka.actor.{ActorSystem, Cancellable}
import grizzled.slf4j.Logging import grizzled.slf4j.Logging
import org.bitcoins.core.p2p.{NetworkPayload, VerAckMessage, VersionMessage} import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.p2p.{
ExpectsResponse,
NetworkPayload,
VerAckMessage,
VersionMessage
}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Disconnected,
InitializedDisconnect,
InitializedDisconnectDone,
Initializing,
Normal,
Preconnection,
StoppedReconnect,
Waiting
}
import scala.concurrent.{Future, Promise} import scala.concurrent.{ExecutionContext, Future, Promise}
sealed abstract class PeerMessageReceiverState extends Logging { sealed abstract class PeerMessageReceiverState extends Logging {
@ -74,6 +93,233 @@ sealed abstract class PeerMessageReceiverState extends Logging {
def isInitialized: Boolean = { def isInitialized: Boolean = {
hasReceivedVersionMsg.isCompleted && hasReceivedVerackMsg.isCompleted hasReceivedVersionMsg.isCompleted && hasReceivedVerackMsg.isCompleted
} }
/** This method is called when we have received
* a [[akka.io.Tcp.Connected]] message from our peer
* This means we have opened a Tcp connection,
* but have NOT started the handshake
* This method will initiate the handshake
*/
protected[networking] def connect(
client: P2PClient,
onInitializationTimeout: Peer => Future[Unit])(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig): PeerMessageReceiverState.Initializing = {
import system.dispatcher
val peer = client.peer
this match {
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
_: InitializedDisconnectDone | _: Disconnected | _: StoppedReconnect |
_: Waiting) =>
throw new RuntimeException(s"Cannot call connect when in state=${bad}")
case Preconnection =>
logger.debug(s"Connection established with peer=${client.peer}")
val initializationTimeoutCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.initializationTimeout) {
val timeoutF = onInitializationTimeout(peer)
timeoutF.failed.foreach(err =>
logger.error(s"Failed to initialize timeout for peer=$peer", err))
}
val newState =
Preconnection.toInitializing(client, initializationTimeoutCancellable)
val peerMsgSender = PeerMessageSender(client)
val chainApi = ChainHandler.fromDatabase()
peerMsgSender.sendVersionMessage(chainApi)
newState
}
}
/** Initializes the disconnection from our peer on the network.
* This is different than [[disconnect()]] as that indicates the
* peer initialized a disconnection from us
*/
private[networking] def initializeDisconnect(
peer: Peer): PeerMessageReceiverState = {
logger.debug(s"Initializing disconnect from $peer")
this match {
case good @ (_: Disconnected) =>
//if its already disconnected, just say init disconnect done so it wont reconnect
logger.debug(s"Init disconnect called for already disconnected $peer")
val newState = InitializedDisconnectDone(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP)
newState
case bad @ (_: InitializedDisconnectDone | Preconnection |
_: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot initialize disconnect from peer=$peer when in state=$bad")
case _: InitializedDisconnect =>
logger.warn(
s"Already initialized disconnected from peer=$peer, this is a noop")
this
case state @ (_: Initializing | _: Normal) =>
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
newState
case state: Waiting =>
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
state.verackMsgP)
newState
}
}
protected[networking] def disconnect(
peer: Peer,
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit])(implicit
system: ActorSystem): Future[PeerMessageReceiverState] = {
import system.dispatcher
logger.trace(s"Disconnecting with internalstate=${this}")
this match {
case bad @ (_: Disconnected | Preconnection |
_: InitializedDisconnectDone | _: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot disconnect from peer=${peer} when in state=${bad}")
case good: InitializedDisconnect =>
val newState = InitializedDisconnectDone(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP.success(()),
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP)
Future.successful(newState)
case good @ (_: Initializing | _: Normal | _: Waiting) =>
val handleF: Future[Unit] = good match {
case wait: Waiting =>
onResponseTimeout(wait.responseFor, peer, onQueryTimeout).map(_ =>
())
case wait: Initializing =>
wait.initializationTimeoutCancellable.cancel()
Future.unit
case _ => Future.unit
}
logger.debug(s"Disconnected bitcoin peer=${peer}")
val newState = Disconnected(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP.success(()),
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP
)
handleF.map(_ => newState)
}
}
def onResponseTimeout(
networkPayload: NetworkPayload,
peer: Peer,
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit])(implicit
ec: ExecutionContext): Future[PeerMessageReceiverState] = {
require(networkPayload.isInstanceOf[ExpectsResponse])
logger.info(
s"Handling response timeout for ${networkPayload.commandName} from $peer")
//isn't this redundant? No, on response timeout may be called when not cancel timeout
this match {
case wait: Waiting => wait.expectedResponseCancellable.cancel()
case _ =>
}
networkPayload match {
case payload: ExpectsResponse =>
logger.info(
s"Response for ${payload.commandName} from $peer timed out in state $this")
onQueryTimeout(payload, peer).map { _ =>
this match {
case _: Waiting if isConnected && isInitialized =>
val newState =
Normal(clientConnectP,
clientDisconnectP,
versionMsgP,
verackMsgP)
newState
case _: PeerMessageReceiverState => this
}
}
case _ =>
logger.error(
s"onResponseTimeout called for ${networkPayload.commandName} which does not expect response")
Future.successful(this)
}
}
def handleExpectResponse(
msg: NetworkPayload,
peer: Peer,
sendResponseTimeout: (Peer, NetworkPayload) => Future[Unit],
onQueryTimeout: (ExpectsResponse, Peer) => Future[Unit])(implicit
system: ActorSystem,
nodeAppConfig: NodeAppConfig): Future[PeerMessageReceiverState] = {
require(
msg.isInstanceOf[ExpectsResponse],
s"Cannot expect response for ${msg.commandName} from $peer as ${msg.commandName} does not expect a response.")
import system.dispatcher
this match {
case good: Normal =>
logger.debug(s"Handling expected response for ${msg.commandName}")
val expectedResponseCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.queryWaitTime) {
val responseTimeoutF =
sendResponseTimeout(peer, msg)
responseTimeoutF.failed.foreach(err =>
logger.error(
s"Failed to timeout waiting for response for peer=$peer",
err))
}
val newState = Waiting(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP,
responseFor = msg,
waitingSince = System.currentTimeMillis(),
expectedResponseCancellable = expectedResponseCancellable
)
Future.successful(newState)
case state: Waiting =>
logger.debug(
s"Waiting for response to ${state.responseFor.commandName}. Ignoring next request for ${msg.commandName}")
Future.successful(this)
case bad @ (_: InitializedDisconnect | _: InitializedDisconnectDone |
_: StoppedReconnect) =>
throw new RuntimeException(
s"Cannot expect response for ${msg.commandName} in state $bad")
case Preconnection | _: Initializing | _: Disconnected =>
//so we sent a message when things were good, but not we are back to connecting?
//can happen when can happen where once we initialize the remote peer immediately disconnects us
onResponseTimeout(msg, peer, onQueryTimeout = onQueryTimeout)
}
}
def stopReconnect(peer: Peer): PeerMessageReceiverState = {
this match {
case Preconnection =>
//when retry, state should be back to preconnection
val newState = StoppedReconnect(clientConnectP,
clientDisconnectP,
versionMsgP,
verackMsgP)
newState
case _: StoppedReconnect =>
logger.warn(
s"Already stopping reconnect from peer=$peer, this is a noop")
this
case bad @ (_: Initializing | _: Normal | _: InitializedDisconnect |
_: InitializedDisconnectDone | _: Disconnected | _: Waiting) =>
throw new RuntimeException(
s"Cannot stop reconnect from peer=$peer when in state=$bad")
}
}
} }
object PeerMessageReceiverState { object PeerMessageReceiverState {

View file

@ -1,11 +1,15 @@
package org.bitcoins.testkit.node package org.bitcoins.testkit.node
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
}
import org.bitcoins.node.{NeutrinoNode, Node, P2PLogger} import org.bitcoins.node.{NeutrinoNode, Node, P2PLogger}
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.testkit.async.TestAsyncUtil
@ -22,14 +26,19 @@ abstract class NodeTestUtil extends P2PLogger {
peer: Peer, peer: Peer,
peerMsgReceiver: PeerMessageReceiver, peerMsgReceiver: PeerMessageReceiver,
supervisor: ActorRef)(implicit supervisor: ActorRef)(implicit
conf: NodeAppConfig, nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
system: ActorSystem system: ActorSystem
): Future[P2PClient] = { ): Future[P2PClient] = {
P2PClient.apply( P2PClient.apply(
peer = peer, peer = peer,
peerMessageReceiver = peerMsgReceiver, peerMessageReceiver = peerMsgReceiver,
peerMsgRecvState = PeerMessageReceiverState.fresh(),
onReconnect = (_: Peer) => Future.unit, onReconnect = (_: Peer) => Future.unit,
onStop = (_: Peer) => Future.unit, onStop = (_: Peer) => Future.unit,
onInitializationTimeout = (_: Peer) => Future.unit,
onQueryTimeout = (_, _) => Future.unit,
sendResponseTimeout = (_, _) => Future.unit,
maxReconnectionTries = 16, maxReconnectionTries = 16,
supervisor = supervisor supervisor = supervisor
) )

View file

@ -209,7 +209,6 @@ object NodeUnitTest extends P2PLogger {
system: ActorSystem): Future[PeerMessageReceiver] = { system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver = val receiver =
PeerMessageReceiver( PeerMessageReceiver(
state = PeerMessageReceiverState.fresh(),
node = node =
buildNode(peer, chainApi, walletCreationTimeOpt)(appConfig.chainConf, buildNode(peer, chainApi, walletCreationTimeOpt)(appConfig.chainConf,
appConfig.nodeConf, appConfig.nodeConf,
@ -390,8 +389,7 @@ object NodeUnitTest extends P2PLogger {
chainAppConfig: ChainAppConfig, chainAppConfig: ChainAppConfig,
system: ActorSystem): Future[PeerMessageReceiver] = { system: ActorSystem): Future[PeerMessageReceiver] = {
val receiver = val receiver =
PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), PeerMessageReceiver(node =
node =
buildNode(peer, chainApi, walletCreationTimeOpt), buildNode(peer, chainApi, walletCreationTimeOpt),
peer = peer) peer = peer)
Future.successful(receiver) Future.successful(receiver)