Add P2PClientSupervisor (#4509)

* add P2PClientSupervisor

* changes from comments: made P2PClient Future

* empty commit to see if mac failures are consistent on ci

* changes from comments

* changes from comments
This commit is contained in:
Shreyansh 2022-08-04 20:36:04 +05:30 committed by GitHub
parent a02e25b0ce
commit c4d358061a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 315 additions and 167 deletions

View File

@ -52,12 +52,14 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
val connAndInit = for {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2, interval = 1.second, maxTries = 5)
.retryUntilSatisfied(peers.size == 2,
maxTries = 30,
interval = 1.second)
_ <- Future
.sequence(peers.map(peerManager.isConnected))
.flatMap(p => assert(p.forall(_ == true)))
res <- Future
.sequence(peers.map(peerManager.isConnected))
.sequence(peers.map(peerManager.isInitialized))
.flatMap(p => assert(p.forall(_ == true)))
} yield res
@ -80,7 +82,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
def has2Peers: Future[Unit] =
AsyncUtil.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
def bothOurs: Future[Assertion] = ourPeersF.map { ours =>
assert(ours.map(peers.contains(_)).forall(_ == true))
}
@ -144,7 +146,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
_ <- Future
.sequence(peers.map(peerManager.isConnected))
.flatMap(p => assert(p.forall(_ == true)))
@ -178,7 +180,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- AsyncUtil
.retryUntilSatisfied(peers.size == 2,
interval = 1.second,
maxTries = 5)
maxTries = 30)
_ <- NodeUnitTest.syncNeutrinoNode(node, bitcoind)
_ <- Future
.sequence(peers.map(peerManager.isConnected))

View File

@ -13,6 +13,7 @@ import org.bitcoins.testkit.tor.CachedTor
import org.bitcoins.testkit.util.TorUtil
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
/** Neutrino node tests that require changing the state of bitcoind instance */
@ -59,7 +60,9 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
for {
bitcoindPeers <- bitcoinPeersF
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2)
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2,
maxTries = 30,
interval = 1.second)
//sync from first bitcoind
_ = node.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(bitcoindPeers(0)))(
@ -69,7 +72,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
expectHeaders = ExpectResponseCommand(
GetHeadersMessage(node.chainConfig.chain.genesisHash))
//waiting for response to header query now
_ = peerManager.peerData(bitcoindPeers(0)).client.actor ! expectHeaders
client <- peerManager.peerData(bitcoindPeers(0)).client
_ = client.actor ! expectHeaders
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri)
//should now sync from bitcoinds(1)

View File

@ -18,6 +18,7 @@ import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
@ -113,7 +114,9 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
_ <- NodeTestUtil.awaitSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFilterHeadersSync(node, bitcoind)
_ <- NodeTestUtil.awaitCompactFiltersSync(node, bitcoind)
_ <- TestAsyncUtil.awaitConditionF(condition2)
_ <- TestAsyncUtil.awaitConditionF(condition2,
interval = 1.second,
maxTries = 30)
// assert we got the full tx with witness data
txs <- wallet.listTransactions()
} yield assert(txs.exists(_.transaction == expectedTx))

View File

@ -34,7 +34,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
val NeutrinoNodeConnectedWithBitcoindV22(node, _) = param
val peer = node.peerManager.peers.head
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
chainApi <- node.chainApiFromDb()
dataMessageHandler = DataMessageHandler(chainApi,
@ -52,6 +52,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
_ <- recoverToSucceededIf[RuntimeException](
chainApi.processHeaders(invalidPayload.headers))
sender <- senderF
// Verify we handle the payload correctly
_ <- dataMessageHandler.handleDataPayload(invalidPayload, sender, peer)
} yield succeed
@ -70,7 +71,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
@ -86,6 +87,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == block)
@ -107,7 +109,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
header <- bitcoind.getBlockHeaderRaw(hash)
@ -122,7 +124,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == Vector(header))
@ -142,7 +144,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head)
filter <- bitcoind.getBlockFilter(hash, FilterType.Basic)
@ -157,7 +159,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == Vector((hash.flip, filter.filter)))
@ -176,7 +178,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
()
}
}
val sender = node.peerMsgSenders(0)
val senderF = node.peerMsgSenders(0)
for {
@ -193,6 +195,7 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
} yield assert(result == tx)

View File

@ -72,12 +72,15 @@ class PeerMessageReceiverTest extends NodeTestWithCachedBitcoindNewest {
val peerMsgReceiver =
PeerMessageReceiver(normal, node, peer)(system, node.nodeAppConfig)
val newMsgReceiver = peerMsgReceiver.disconnect()
val newMsgReceiverF = peerMsgReceiver.disconnect()
newMsgReceiverF.map { newMsgReceiver =>
assert(
newMsgReceiver.state
.isInstanceOf[PeerMessageReceiverState.Disconnected])
assert(newMsgReceiver.isDisconnected)
}
assert(
newMsgReceiver.state
.isInstanceOf[PeerMessageReceiverState.Disconnected])
assert(newMsgReceiver.isDisconnected)
}
it must "change a peer message receiver to be initializing disconnect" in {
@ -118,12 +121,12 @@ class PeerMessageReceiverTest extends NodeTestWithCachedBitcoindNewest {
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnect])
assert(!newMsgReceiver.isDisconnected)
val disconnectRecv = newMsgReceiver.disconnect()
assert(
disconnectRecv.state
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone])
assert(disconnectRecv.isDisconnected)
assert(disconnectRecv.state.clientDisconnectP.isCompleted)
newMsgReceiver.disconnect().map { disconnectRecv =>
assert(
disconnectRecv.state
.isInstanceOf[PeerMessageReceiverState.InitializedDisconnectDone])
assert(disconnectRecv.isDisconnected)
assert(disconnectRecv.state.clientDisconnectP.isCompleted)
}
}
}

View File

@ -83,7 +83,7 @@ case class NeutrinoNode(
_ = logger.info(s"Syncing with $syncPeer")
_ = updateDataMessageHandler(
dataMessageHandler.copy(syncPeer = Some(syncPeer)))
peerMsgSender = peerManager.peerData(syncPeer).peerMessageSender
peerMsgSender <- peerManager.peerData(syncPeer).peerMessageSender
bestHash <- chainApi.getBestBlockHash()
_ <- peerMsgSender.sendGetCompactFilterCheckPointMessage(stopHash =
@ -149,14 +149,13 @@ case class NeutrinoNode(
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val syncPeer = dataMessageHandler.syncPeer.getOrElse(
throw new RuntimeException("Sync peer not set"))
val syncPeerMsgSender = peerManager.peerData(syncPeer).peerMessageSender
val sendCompactFilterHeaderMsgF = {
syncPeerMsgSender
.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
}
val syncPeerMsgSenderF = peerManager.peerData(syncPeer).peerMessageSender
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderF.flatMap(
_.sendNextGetCompactFilterHeadersCommand(
chainApi = chainApi,
filterHeaderBatchSize = chainConfig.filterHeaderBatchSize,
prevStopHash = bestFilterHeader.blockHashBE)
)
sendCompactFilterHeaderMsgF.flatMap { isSyncFilterHeaders =>
// If we have started syncing filters
if (
@ -166,12 +165,15 @@ case class NeutrinoNode(
) {
//means we are not syncing filter headers, and our filters are NOT
//in sync with our compact filter headers
syncPeerMsgSender
.sendNextGetCompactFilterCommand(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,
startHeight = bestFilterOpt.get.height)
.map(_ => ())
syncPeerMsgSenderF.flatMap { sender =>
sender
.sendNextGetCompactFilterCommand(chainApi = chainApi,
filterBatchSize =
chainConfig.filterBatchSize,
startHeight =
bestFilterOpt.get.height)
.map(_ => ())
}
} else {
Future.unit
}

View File

@ -72,9 +72,10 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that
* the [[ChainApi chain api]] is updated inside of the p2p client
*/
def clients: Vector[P2PClient] = peerManager.clients
def clients: Vector[Future[P2PClient]] = peerManager.clients
def peerMsgSenders: Vector[PeerMessageSender] = peerManager.peerMsgSenders
def peerMsgSenders: Vector[Future[PeerMessageSender]] =
peerManager.peerMsgSenders
/** Sends the given P2P to our peer.
* This method is useful for playing around
@ -82,7 +83,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* `private[node]`.
*/
def send(msg: NetworkPayload, peer: Peer): Future[Unit] = {
peerManager.peerData(peer).peerMessageSender.sendMsg(msg)
val senderF = peerManager.peerData(peer).peerMessageSender
senderF.flatMap(_.sendMsg(msg))
}
/** Starts our node */
@ -163,7 +165,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
if (connected) {
logger.info(s"Sending out tx message for tx=$txIds")
Future.sequence(
peerMsgSenders.map(_.sendInventoryMessage(transactions: _*)))
peerMsgSenders.map(
_.flatMap(_.sendInventoryMessage(transactions: _*))))
} else {
Future.failed(
new RuntimeException(
@ -186,7 +189,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
peerManager
.peerData(peer)
.peerMessageSender
.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, blockHashes: _*)
.flatMap(_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*))
case None =>
throw new RuntimeException(
"IBD not started yet. Cannot query for blocks.")

View File

@ -1,6 +1,6 @@
package org.bitcoins.node
import akka.actor.ActorSystem
import akka.actor.{ActorRef, ActorSystem}
import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
@ -10,27 +10,32 @@ import org.bitcoins.node.networking.peer.{
PeerMessageSender
}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
/** PeerData contains objects specific to a peer associated together
*/
case class PeerData(
peer: Peer,
node: Node
node: Node,
supervisor: ActorRef
)(implicit system: ActorSystem, nodeAppConfig: NodeAppConfig) {
import system.dispatcher
lazy val peerMessageSender: PeerMessageSender = PeerMessageSender(client)
lazy val peerMessageSender: Future[PeerMessageSender] = {
client.map(PeerMessageSender(_))
}
lazy val client: P2PClient = {
lazy val client: Future[P2PClient] = {
val peerMessageReceiver =
PeerMessageReceiver.newReceiver(node = node, peer = peer)
P2PClient(
context = system,
peer = peer,
peerMessageReceiver = peerMessageReceiver,
onReconnect = node.peerManager.onReconnect,
onStop = node.peerManager.onP2PClientStopped,
maxReconnectionTries = 4
maxReconnectionTries = 4,
supervisor = supervisor
)
}

View File

@ -1,6 +1,7 @@
package org.bitcoins.node
import akka.actor.{ActorSystem, Cancellable}
import akka.actor.{ActorRef, ActorSystem, Cancellable}
import monix.execution.atomic.AtomicBoolean
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
@ -12,12 +13,13 @@ import scala.collection.mutable
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.util.Random
import scala.util.{Failure, Random, Success}
case class PeerFinder(
paramPeers: Vector[Peer],
node: NeutrinoNode,
skipPeers: () => Vector[Peer])(implicit
skipPeers: () => Vector[Peer],
supervisor: ActorRef)(implicit
ec: ExecutionContext,
system: ActorSystem,
nodeAppConfig: NodeAppConfig)
@ -116,12 +118,17 @@ case class PeerFinder(
else nodeAppConfig.tryNextPeersInterval
}
private val isConnectionSchedulerRunning = AtomicBoolean(false)
private lazy val peerConnectionScheduler: Cancellable =
system.scheduler.scheduleWithFixedDelay(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) {
new Runnable() {
override def run(): Unit = {
delay = nodeAppConfig.tryNextPeersInterval) { () =>
{
if (
isConnectionSchedulerRunning.compareAndSet(expect = false,
update = true)
) {
logger.debug(s"Cache size: ${_peerData.size}. ${_peerData.keys}")
if (_peersToTry.size < 32)
_peersToTry.pushAll(getPeersFromDnsSeeds)
@ -130,7 +137,17 @@ case class PeerFinder(
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p))
logger.debug(s"Trying next set of peers $peers")
peers.foreach(tryPeer)
val peersF = Future.sequence(peers.map(tryPeer))
peersF.onComplete {
case Success(_) =>
isConnectionSchedulerRunning.set(false)
case Failure(err) =>
isConnectionSchedulerRunning.set(false)
logger.error(s"Failed to connect to peers", err)
}
} else {
logger.warn(
s"Previous connection scheduler is still running, skipping this run, it will run again in ${nodeAppConfig.tryNextPeersInterval}")
}
}
}
@ -138,9 +155,10 @@ case class PeerFinder(
override def start(): Future[PeerFinder] = {
logger.debug(s"Starting PeerFinder")
(getPeersFromParam ++ getPeersFromConfig).distinct.foreach(tryPeer)
val initPeerF = Future.sequence(
(getPeersFromParam ++ getPeersFromConfig).distinct.map(tryPeer))
if (nodeAppConfig.enablePeerDiscovery) {
val peerDiscoveryF = if (nodeAppConfig.enablePeerDiscovery) {
val startedF = for {
(dbNonCf, dbCf) <- getPeersFromDb
} yield {
@ -158,6 +176,8 @@ case class PeerFinder(
logger.info("Peer discovery disabled.")
Future.successful(this)
}
initPeerF.flatMap(_ => peerDiscoveryF)
}
override def stop(): Future[PeerFinder] = {
@ -166,19 +186,22 @@ case class PeerFinder(
//delete try queue
_peersToTry.clear()
_peerData.foreach(_._2.client.close())
val closeFs = _peerData.map(_._2.client.map(_.close()))
val closeF = Future.sequence(closeFs)
AsyncUtil
val waitStopF = AsyncUtil
.retryUntilSatisfied(_peerData.isEmpty,
interval = 1.seconds,
maxTries = 10)
maxTries = 30)
.map(_ => this)
closeF.flatMap(_ => waitStopF)
}
/** creates and initialises a new test peer */
def tryPeer(peer: Peer): Unit = {
_peerData.put(peer, PeerData(peer, node))
_peerData(peer).peerMessageSender.connect()
def tryPeer(peer: Peer): Future[Unit] = {
_peerData.put(peer, PeerData(peer, node, supervisor))
_peerData(peer).peerMessageSender.map(_.connect())
}
def removePeer(peer: Peer): Unit = {

View File

@ -1,6 +1,6 @@
package org.bitcoins.node
import akka.actor.ActorSystem
import akka.actor.{ActorRef, ActorSystem, Props}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p.{
@ -12,8 +12,9 @@ import org.bitcoins.core.p2p.{
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageSender
import org.bitcoins.node.networking.{P2PClient, P2PClientSupervisor}
import org.bitcoins.node.util.BitcoinSNodeUtil
import scodec.bits.ByteVector
import java.net.InetAddress
@ -38,7 +39,13 @@ case class PeerManager(
private val _waitingForDeletion: mutable.Set[Peer] = mutable.Set.empty
def waitingForDeletion: Set[Peer] = _waitingForDeletion.toSet
val finder: PeerFinder = PeerFinder(paramPeers, node, skipPeers = () => peers)
val supervisor: ActorRef =
system.actorOf(Props[P2PClientSupervisor](),
name =
BitcoinSNodeUtil.createActorName("P2PClientSupervisor"))
val finder: PeerFinder =
PeerFinder(paramPeers, node, skipPeers = () => peers, supervisor)
def connectedPeerCount: Int = _peerData.size
@ -56,12 +63,13 @@ case class PeerManager(
def peers: Vector[Peer] = _peerData.keys.toVector
def peerMsgSenders: Vector[PeerMessageSender] =
def peerMsgSenders: Vector[Future[PeerMessageSender]] =
_peerData.values
.map(_.peerMessageSender)
.toVector
def clients: Vector[P2PClient] = _peerData.values.map(_.client).toVector
def clients: Vector[Future[P2PClient]] =
_peerData.values.map(_.client).toVector
def randomPeerWithService(services: ServiceIdentifier): Future[Peer] = {
//wait when requested
@ -88,7 +96,7 @@ case class PeerManager(
def randomPeerMsgSenderWithService(
services: ServiceIdentifier): Future[PeerMessageSender] = {
val randomPeerF = randomPeerWithService(services)
randomPeerF.map(peer => peerData(peer).peerMessageSender)
randomPeerF.flatMap(peer => peerData(peer).peerMessageSender)
}
def createInDb(
@ -145,7 +153,7 @@ case class PeerManager(
addPeer(withPeer)
}
def removePeer(peer: Peer): Unit = {
def removePeer(peer: Peer): Future[Unit] = {
logger.debug(s"Removing persistent peer $peer")
val client = peerData(peer).client
_peerData.remove(peer)
@ -154,7 +162,7 @@ case class PeerManager(
//leading to a memory leak may happen
_waitingForDeletion.add(peer)
//now send request to stop actor which will be completed some time in future
client.close()
client.map(_.close())
}
def isReconnection(peer: Peer): Boolean = {
@ -177,14 +185,15 @@ case class PeerManager(
val finderStopF = finder.stop()
peers.foreach(removePeer)
val removeF = Future.sequence(peers.map(removePeer))
val managerStopF = AsyncUtil.retryUntilSatisfied(
_peerData.isEmpty && waitingForDeletion.isEmpty,
interval = 1.seconds,
maxTries = 10)
maxTries = 30)
for {
_ <- removeF
_ <- finderStopF
_ <- managerStopF
} yield {
@ -196,30 +205,31 @@ case class PeerManager(
def isConnected(peer: Peer): Future[Boolean] = {
if (peerData.contains(peer))
peerData(peer).peerMessageSender.isConnected()
peerData(peer).peerMessageSender.flatMap(_.isConnected())
else Future.successful(false)
}
def isInitialized(peer: Peer): Future[Boolean] = {
if (peerData.contains(peer))
peerData(peer).peerMessageSender.isInitialized()
peerData(peer).peerMessageSender.flatMap(_.isInitialized())
else Future.successful(false)
}
def onInitializationTimeout(peer: Peer): Unit = {
def onInitializationTimeout(peer: Peer): Future[Unit] = {
assert(!finder.hasPeer(peer) || !peerData.contains(peer),
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
//one of the peers that we tried, failed to init within time, disconnect
finder.getData(peer).client.close()
finder.getData(peer).client.map(_.close())
} else if (peerData.contains(peer)) {
//this is one of our persistent peers which must have been initialized earlier, this can happen in case of
//a reconnection attempt, meaning it got connected but failed to initialize, disconnect
peerData(peer).client.close()
peerData(peer).client.map(_.close())
} else {
//this should never happen
logger.warn(s"onInitializationTimeout called for unknown $peer")
Future.unit
}
}
@ -238,7 +248,7 @@ case class PeerManager(
logger.debug(s"Initialized peer $peer with $hasCf")
def sendAddrReq: Future[Unit] =
finder.getData(peer).peerMessageSender.sendGetAddrMessage()
finder.getData(peer).peerMessageSender.flatMap(_.sendGetAddrMessage())
def managePeerF(): Future[Unit] = {
//if we have slots remaining, connect
@ -257,10 +267,11 @@ case class PeerManager(
//we do want to give it enough time to send addr messages
AsyncUtil
.nonBlockingSleep(duration = 10.seconds)
.map { _ =>
.flatMap { _ =>
//could have already been deleted in case of connection issues
if (finder.hasPeer(peer))
finder.getData(peer).client.close()
finder.getData(peer).client.map(_.close())
else Future.unit
}
}
}

View File

@ -1,12 +1,18 @@
package org.bitcoins.node.networking
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props, Terminated}
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}
import akka.event.LoggingReceive
import akka.io.Tcp.SO.KeepAlive
import akka.io.{IO, Tcp}
import akka.pattern.ask
import akka.util.{ByteString, CompactByteString, Timeout}
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.p2p.{NetworkHeader, NetworkMessage, NetworkPayload}
import org.bitcoins.core.p2p.{
ExpectsResponse,
NetworkHeader,
NetworkMessage,
NetworkPayload
}
import org.bitcoins.core.util.{FutureUtil, NetworkUtil}
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
@ -26,7 +32,6 @@ import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
}
import org.bitcoins.node.util.BitcoinSNodeUtil
import org.bitcoins.tor.Socks5Connection.{Socks5Connect, Socks5Connected}
import org.bitcoins.tor.{Socks5Connection, Socks5ProxyParams}
import scodec.bits.ByteVector
@ -76,6 +81,8 @@ case class P2PClientActor(
extends Actor
with P2PLogger {
import context.dispatcher
private var currentPeerMsgHandlerRecv = initPeerMsgHandlerReceiver
private var reconnectHandlerOpt: Option[Peer => Future[Unit]] = None
@ -109,6 +116,12 @@ case class P2PClientActor(
unalignedBytes: ByteVector): Receive =
LoggingReceive {
case message: NetworkMessage =>
message match {
case _: ExpectsResponse =>
logger.debug(s"${message.payload.commandName} expects response")
Await.result(handleExpectResponse(message.payload), timeout)
case _ =>
}
sendNetworkMessage(message, peerConnection)
case payload: NetworkPayload =>
val networkMsg = NetworkMessage(network, payload)
@ -126,11 +139,33 @@ case class P2PClientActor(
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsg(metaMsg)
case ExpectResponseCommand(msg) =>
handleExpectResponse(msg)
Await.result(handleExpectResponse(msg), timeout)
case Terminated(actor) if actor == peerConnection =>
reconnect()
}
/** Behavior to ignore network messages. Used only when the peer is being disconnected by us as we would not want to
* process any messages from it in that state.
*/
private def ignoreNetworkMessages(
peerConnectionOpt: Option[ActorRef],
unalignedBytes: ByteVector): Receive = LoggingReceive {
case _ @(_: NetworkMessage | _: NetworkPayload |
_: ExpectResponseCommand) =>
case message: Tcp.Event if peerConnectionOpt.isDefined =>
val newUnalignedBytes =
handleEvent(message, peerConnectionOpt.get, unalignedBytes)
context.become(
ignoreNetworkMessages(peerConnectionOpt, newUnalignedBytes))
case _ @(P2PClient.CloseCommand | P2PClient.CloseAnyStateCommand) =>
//ignore
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsg(metaMsg)
case Terminated(actor)
if peerConnectionOpt.isDefined && actor == peerConnectionOpt.get =>
reconnect()
}
override def receive: Receive = LoggingReceive {
case cmd: NodeCommand =>
handleNodeCommand(cmd = cmd, peerConnectionOpt = None)
@ -141,8 +176,7 @@ case class P2PClientActor(
override def postStop(): Unit = {
super.postStop()
logger.debug(s"Stopped client for $peer")
implicit def ec: ExecutionContext = context.dispatcher
onStop(peer).foreach(_ => ())
Await.result(onStop(peer), timeout)
}
def reconnecting: Receive = LoggingReceive {
@ -158,7 +192,7 @@ case class P2PClientActor(
handleNodeCommand(cmd = P2PClient.CloseAnyStateCommand,
peerConnectionOpt = None)
case ExpectResponseCommand(msg) =>
handleExpectResponse(msg)
Await.result(handleExpectResponse(msg), timeout)
case metaMsg: P2PClient.MetaMsg =>
sender() ! handleMetaMsg(metaMsg)
}
@ -169,7 +203,7 @@ case class P2PClientActor(
handleNodeCommand(cmd = P2PClient.CloseAnyStateCommand,
peerConnectionOpt = None)
case ExpectResponseCommand(msg) =>
handleExpectResponse(msg)
Await.result(handleExpectResponse(msg), timeout)
case Tcp.CommandFailed(c: Tcp.Connect) =>
val peerOrProxyAddress = c.remoteAddress
logger.debug(
@ -274,7 +308,9 @@ case class P2PClientActor(
state match {
case wait: Waiting =>
currentPeerMsgHandlerRecv.onResponseTimeout(wait.responseFor)
wait.timeout.cancel()
wait.expectedResponseCancellable.cancel()
case init: Initializing =>
init.initializationTimeoutCancellable.cancel()
case _ =>
}
@ -334,13 +370,15 @@ case class P2PClientActor(
case Tcp.ErrorClosed(cause) =>
logger.debug(
s"An error occurred in our connection with $peer, cause=$cause state=${currentPeerMsgHandlerRecv.state}")
currentPeerMsgHandlerRecv = currentPeerMsgHandlerRecv.disconnect()
currentPeerMsgHandlerRecv =
Await.result(currentPeerMsgHandlerRecv.disconnect(), timeout)
unalignedBytes
case closeCmd @ (Tcp.ConfirmedClosed | Tcp.Closed | Tcp.Aborted |
Tcp.PeerClosed) =>
logger.debug(
s"We've been disconnected by $peer command=${closeCmd} state=${currentPeerMsgHandlerRecv.state}")
currentPeerMsgHandlerRecv = currentPeerMsgHandlerRecv.disconnect()
currentPeerMsgHandlerRecv =
Await.result(currentPeerMsgHandlerRecv.disconnect(), timeout)
unalignedBytes
case Tcp.Received(byteString: ByteString) =>
@ -443,6 +481,8 @@ case class P2PClientActor(
peerConnectionOpt match {
case Some(peerConnection) =>
logger.debug(s"Disconnecting from peer $peer")
context become ignoreNetworkMessages(Some(peerConnection),
ByteVector.empty)
currentPeerMsgHandlerRecv =
currentPeerMsgHandlerRecv.initializeDisconnect()
peerConnection ! Tcp.Close
@ -454,10 +494,13 @@ case class P2PClientActor(
logger.debug(s"Received close any state for $peer")
peerConnectionOpt match {
case Some(peerConnection) =>
context become ignoreNetworkMessages(Some(peerConnection),
ByteVector.empty)
currentPeerMsgHandlerRecv =
currentPeerMsgHandlerRecv.initializeDisconnect()
peerConnection ! Tcp.Close
case None =>
context become ignoreNetworkMessages(None, ByteVector.empty)
currentPeerMsgHandlerRecv =
currentPeerMsgHandlerRecv.stopReconnect()
context.stop(self)
@ -465,9 +508,21 @@ case class P2PClientActor(
}
}
def handleExpectResponse(msg: NetworkPayload): Unit = {
currentPeerMsgHandlerRecv =
currentPeerMsgHandlerRecv.handleExpectResponse(msg)
/** For any [[org.bitcoins.core.p2p.NetworkPayload]], if it a subtype of [[ExpectsResponse]], starts a
* cancellable timer and changes state to [[PeerMessageReceiverState.Waiting]]. Note that any messages, received
* while waiting for a particular message are still processed, they just won't cancel the scheduled query timeout.
* Can only wait for one query at a time, other messages that are [[ExpectsResponse]] and received while in
* [[PeerMessageReceiverState.Waiting]] are still sent, but no query timeout would be executed for those.
* Currently, such a situation is not meant to happen.
*/
def handleExpectResponse(msg: NetworkPayload): Future[Unit] = {
require(
msg.isInstanceOf[ExpectsResponse],
s"Tried to wait for response to message which is not a query, got=$msg")
logger.info(s"Expecting response for ${msg.commandName} for $peer")
currentPeerMsgHandlerRecv.handleExpectResponse(msg).map { newReceiver =>
currentPeerMsgHandlerRecv = newReceiver
}
}
}
@ -559,22 +614,28 @@ object P2PClient extends P2PLogger {
config)
def apply(
context: ActorRefFactory,
peer: Peer,
peerMessageReceiver: PeerMessageReceiver,
onReconnect: Peer => Future[Unit],
onStop: Peer => Future[Unit],
maxReconnectionTries: Int = 16)(implicit
config: NodeAppConfig): P2PClient = {
val actorRef = context.actorOf(props = props(peer,
peerMessageReceiver,
onReconnect,
onStop,
maxReconnectionTries),
name =
BitcoinSNodeUtil.createActorName(getClass))
maxReconnectionTries: Int = 16,
supervisor: ActorRef)(implicit
config: NodeAppConfig,
system: ActorSystem): Future[P2PClient] = {
P2PClient(actorRef, peer)
val clientProps = props(peer,
peerMessageReceiver,
onReconnect,
onStop,
maxReconnectionTries)
import system.dispatcher
implicit val timeout: Timeout = Timeout(10.second)
val actorRefF = supervisor ? clientProps
actorRefF.map { actorRef =>
P2PClient(actorRef.asInstanceOf[ActorRef], peer)
}
}
/** Akka sends messages as one byte stream. There is not a 1 to 1 relationship between byte streams received and

View File

@ -0,0 +1,23 @@
package org.bitcoins.node.networking
import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, OneForOneStrategy, Props}
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.util.BitcoinSNodeUtil
class P2PClientSupervisor extends Actor with P2PLogger {
override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy() { case e: Throwable =>
logger.error("Error while processing messages", e)
Stop
}
def receive: Receive = { case props: Props =>
/* actors to be supervised need to built withing this context this creates an actor using props and sends back
the ActorRef */
sender() ! context.actorOf(props,
name =
BitcoinSNodeUtil.createActorName(getClass))
}
}

View File

@ -9,7 +9,8 @@ import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.{Node, P2PLogger}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
/** Responsible for receiving messages from a peer on the
* p2p network. This is called by [[org.bitcoins.rpc.client.common.Client Client]] when doing the p2p
@ -44,11 +45,12 @@ class PeerMessageReceiver(
case Preconnection =>
logger.debug(s"Connection established with peer=${peer}")
val timeout =
val initializationTimeoutCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.initializationTimeout)(
onInitTimeout())
Await.result(onInitTimeout(), 10.seconds))
val newState = Preconnection.toInitializing(client, timeout)
val newState =
Preconnection.toInitializing(client, initializationTimeoutCancellable)
val peerMsgSender = PeerMessageSender(client)
@ -91,7 +93,6 @@ class PeerMessageReceiver(
state.verackMsgP)
toState(newState)
case state: Waiting =>
onResponseTimeout(state.responseFor)
val newState = InitializedDisconnect(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
@ -121,7 +122,7 @@ class PeerMessageReceiver(
}
}
protected[networking] def disconnect(): PeerMessageReceiver = {
protected[networking] def disconnect(): Future[PeerMessageReceiver] = {
logger.trace(s"Disconnecting with internalstate=${state}")
state match {
case bad @ (_: Disconnected | Preconnection |
@ -134,15 +135,19 @@ class PeerMessageReceiver(
clientDisconnectP = good.clientDisconnectP.success(()),
versionMsgP = good.versionMsgP,
verackMsgP = good.verackMsgP)
new PeerMessageReceiver(node, newState, peer)
val newReceiver = new PeerMessageReceiver(node, newState, peer)
Future.successful(newReceiver)
case good @ (_: Initializing | _: Normal | _: Waiting) =>
good match {
val handleF: Future[Unit] = good match {
case wait: Waiting =>
onResponseTimeout(wait.responseFor)
wait.timeout.cancel()
onResponseTimeout(wait.responseFor).map { _ =>
wait.expectedResponseCancellable.cancel()
()
}
case wait: Initializing =>
wait.timeout.cancel()
case _ =>
wait.initializationTimeoutCancellable.cancel()
Future.unit
case _ => Future.unit
}
logger.debug(s"Disconnected bitcoin peer=${peer}")
@ -153,7 +158,8 @@ class PeerMessageReceiver(
verackMsgP = good.verackMsgP
)
new PeerMessageReceiver(node, newState, peer)
val newReceiver = new PeerMessageReceiver(node, newState, peer)
handleF.map(_ => newReceiver)
}
}
@ -192,7 +198,7 @@ class PeerMessageReceiver(
val timeTaken = System.currentTimeMillis() - state.waitingSince
logger.debug(
s"Received expected response ${payload.commandName} in $timeTaken ms")
state.timeout.cancel()
state.expectedResponseCancellable.cancel()
val newState = Normal(state.clientConnectP,
state.clientDisconnectP,
state.versionMsgP,
@ -201,7 +207,7 @@ class PeerMessageReceiver(
} else this
case state: Initializing =>
if (payload == VerAckMessage)
state.timeout.cancel()
state.initializationTimeoutCancellable.cancel()
this
case _ => this
}
@ -253,39 +259,40 @@ class PeerMessageReceiver(
.handleControlPayload(payload, sender, peer, curReceiver)
}
def onInitTimeout(): Unit = {
def onInitTimeout(): Future[Unit] = {
logger.debug(s"Init timeout for peer $peer")
node.peerManager.onInitializationTimeout(peer)
}
def onResponseTimeout(networkPayload: NetworkPayload): Unit = {
def onResponseTimeout(networkPayload: NetworkPayload): Future[Unit] = {
assert(networkPayload.isInstanceOf[ExpectsResponse])
logger.debug(s"Handling response timeout for ${networkPayload.commandName}")
//isn't this redundant? No, on response timeout may be called when not cancel timeout
state match {
case wait: Waiting => wait.timeout.cancel()
case wait: Waiting => wait.expectedResponseCancellable.cancel()
case _ =>
}
networkPayload match {
case payload: ExpectsResponse =>
logger.debug(
s"Response for ${payload.commandName} from $peer timed out.")
node.peerManager.onQueryTimeout(payload, peer).foreach(_ => ())
s"Response for ${payload.commandName} from $peer timed out in state $state")
node.peerManager.onQueryTimeout(payload, peer)
case _ =>
logger.error(
s"onResponseTimeout called for ${networkPayload.commandName} which does not expect response")
Future.unit
}
}
def handleExpectResponse(msg: NetworkPayload): PeerMessageReceiver = {
def handleExpectResponse(msg: NetworkPayload): Future[PeerMessageReceiver] = {
state match {
case good: Normal =>
logger.debug(s"Handling expected response for ${msg.commandName}")
val timeout =
val expectedResponseCancellable =
system.scheduler.scheduleOnce(nodeAppConfig.queryWaitTime)(
onResponseTimeout(msg))
Await.result(onResponseTimeout(msg), 10.seconds))
val newState = Waiting(
clientConnectP = good.clientConnectP,
clientDisconnectP = good.clientDisconnectP,
@ -293,13 +300,13 @@ class PeerMessageReceiver(
verackMsgP = good.verackMsgP,
responseFor = msg,
waitingSince = System.currentTimeMillis(),
timeout = timeout
expectedResponseCancellable = expectedResponseCancellable
)
toState(newState)
Future.successful(toState(newState))
case state: Waiting =>
logger.debug(
s"Waiting for response to ${state.responseFor.commandName}. Ignoring next request for ${msg.commandName}")
this
Future.successful(this)
case bad @ (_: InitializedDisconnect | _: InitializedDisconnectDone |
_: StoppedReconnect) =>
throw new RuntimeException(
@ -307,9 +314,7 @@ class PeerMessageReceiver(
case Preconnection | _: Initializing | _: Disconnected =>
//so we sent a message when things were good, but not we are back to connecting?
//can happen when can happen where once we initialize the remote peer immediately disconnects us
onResponseTimeout(msg)
this
onResponseTimeout(msg).flatMap(_ => Future.successful(this))
}
}

View File

@ -101,7 +101,7 @@ object PeerMessageReceiverState {
versionMsgP = versionMsgP,
verackMsgP = verackMsgP,
waitingSince = System.currentTimeMillis(),
timeout = timeout
initializationTimeoutCancellable = timeout
)
}
}
@ -117,7 +117,7 @@ object PeerMessageReceiverState {
versionMsgP: Promise[VersionMessage],
verackMsgP: Promise[VerAckMessage.type],
waitingSince: Long,
timeout: Cancellable
initializationTimeoutCancellable: Cancellable
) extends PeerMessageReceiverState {
require(
isConnected,
@ -134,7 +134,7 @@ object PeerMessageReceiverState {
versionMsgP = versionMsgP.success(versionMsg),
verackMsgP = verackMsgP,
waitingSince = waitingSince,
timeout = timeout
initializationTimeoutCancellable = initializationTimeoutCancellable
)
}
@ -142,7 +142,7 @@ object PeerMessageReceiverState {
* our [[org.bitcoins.node.networking.peer.PeerMessageReceiverState PeerMessageReceiverState]] to [[org.bitcoins.node.networking.peer.PeerMessageReceiverState.Normal PeerMessageReceiverState.Normal]]
*/
def toNormal(verAckMessage: VerAckMessage.type): Normal = {
timeout.cancel()
initializationTimeoutCancellable.cancel()
Normal(
clientConnectP = clientConnectP,
clientDisconnectP = clientDisconnectP,
@ -189,6 +189,9 @@ object PeerMessageReceiverState {
override def toString: String = "InitializedDisconnect"
}
/** State when waiting for response to a message of type [[org.bitcoins.core.p2p.ExpectsResponse]]. Other messages
* are still processed and receiver will continue waiting until timeout.
*/
case class Waiting(
clientConnectP: Promise[P2PClient],
clientDisconnectP: Promise[Unit],
@ -196,7 +199,7 @@ object PeerMessageReceiverState {
verackMsgP: Promise[VerAckMessage.type],
responseFor: NetworkPayload,
waitingSince: Long,
timeout: Cancellable)
expectedResponseCancellable: Cancellable)
extends PeerMessageReceiverState {
override def toString: String = "Waiting"
}

View File

@ -16,7 +16,6 @@ import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.constant.NodeConstants
import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
@ -244,14 +243,6 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val networkMsg = NetworkMessage(conf.network, msg)
client.actor ! networkMsg
msg match {
case _: ExpectsResponse =>
logger.debug(s"${msg.commandName} expects response")
client.actor ! ExpectResponseCommand(msg)
case _ =>
}
Future.unit
}
}

View File

@ -62,6 +62,9 @@
<!-- inspect TCP details -->
<logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN"/>
<!-- See exceptions thrown in actor-->
<logger name="org.bitcoins.node.networking.P2PClientSupervisor" level="WARN"/>
<!-- ╔════════════════════╗ -->
<!-- ║ Chain module ║ -->
<!-- ╚════════════════════╝ -->

View File

@ -1,6 +1,6 @@
package org.bitcoins.testkit.node
import akka.actor.{ActorRefFactory, ActorSystem}
import akka.actor.{ActorRef, ActorSystem}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
@ -18,16 +18,19 @@ import scala.concurrent.{ExecutionContext, Future}
abstract class NodeTestUtil extends P2PLogger {
def client(peer: Peer, peerMsgReceiver: PeerMessageReceiver)(implicit
ref: ActorRefFactory,
conf: NodeAppConfig
): P2PClient = {
P2PClient.apply(ref,
peer,
def client(
peer: Peer,
peerMsgReceiver: PeerMessageReceiver,
supervisor: ActorRef)(implicit
conf: NodeAppConfig,
system: ActorSystem
): Future[P2PClient] = {
P2PClient.apply(peer,
peerMsgReceiver,
(_: Peer) => Future.unit,
(_: Peer) => Future.unit,
16)
16,
supervisor)
}
/** Helper method to get the [[java.net.InetSocketAddress]]

View File

@ -221,14 +221,13 @@ object NodeUnitTest extends P2PLogger {
system: ActorSystem): Future[PeerHandler] = {
import system.dispatcher
val nodeF = buildNode(peer, walletCreationTimeOpt)
val peerMsgReceiverF = nodeF.map { node =>
PeerMessageReceiver.preConnection(peer, node)
}
//the problem here is the 'self', this needs to be an ordinary peer message handler
//that can handle the handshake
val peerHandlerF = for {
peerMsgReceiver <- peerMsgReceiverF
client = NodeTestUtil.client(peer, peerMsgReceiver)
node <- nodeF
peerMsgReceiver = PeerMessageReceiver.preConnection(peer, node)
supervisor = node.peerManager.supervisor
client <- NodeTestUtil.client(peer, peerMsgReceiver, supervisor)
peerMsgSender = PeerMessageSender(client)
} yield PeerHandler(client, peerMsgSender)