Fix ReConnectionTest part 2 (#5074)

* Fix ReConnectionTest

* Cleanup

* Revert logback-test.xml

* Fix connectioncount test case

* Get P2PClientTest passing consistently

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-05-22 08:11:57 -05:00 committed by GitHub
parent c0403da7c6
commit 8e4aa49aef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 39 deletions

View File

@ -356,13 +356,20 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
initConnectionCount <- node.getConnectionCount
_ = assert(initConnectionCount == 2)
nodeUri0 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
peer0 <- NodeTestUtil.getBitcoindPeer(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri0)
_ <- AsyncUtil.nonBlockingSleep(1.seconds)
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
node.peerManager.isDisconnected(peer0))
onePeerConnectionCount <- node.getConnectionCount
_ = assert(onePeerConnectionCount == 1)
nodeUri1 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(1))
_ <- bitcoinds(1).disconnectNode(nodeUri1)
_ <- AsyncUtil.nonBlockingSleep(1.seconds)
peer1 <- NodeTestUtil.getBitcoindPeer(bitcoinds(1))
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
node.peerManager.isDisconnected(peer1))
//there is a race condition here between checking the connection count
//and the reconnection logic attempting to reconnect us to this peer
//since we don't want our node to be connected to nothing
zeroPeerConnectionCount <- node.getConnectionCount
} yield assert(zeroPeerConnectionCount == 0)
}

View File

@ -100,6 +100,9 @@ class P2PClientActorTest
val peerMessageReceiverF =
for {
node <- NodeUnitTest.buildNode(peer, None)
//piggy back off of node infra to setup p2p clients, but don't actually use
//the node itself so stop it here an clean up resources allocated by it
_ <- node.stop()
} yield PeerMessageReceiver(
controlMessageHandler = node.controlMessageHandler,
dataMessageHandler = node.peerManager.getDataMessageHandler,

View File

@ -1,21 +1,15 @@
package org.bitcoins.node.networking
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.node.networking.peer.PeerHandler
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.node.{
NodeTestUtil,
NodeTestWithCachedBitcoindNewest,
NodeUnitTest
NodeTestWithCachedBitcoindNewest
}
import org.bitcoins.testkit.util.AkkaUtil
import org.scalatest.FutureOutcome
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
override protected def getFreshConfig: BitcoinSAppConfig =
@ -41,27 +35,20 @@ class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
val bitcoind = nodeConnectedWithBitcoind.bitcoind
val node = nodeConnectedWithBitcoind.node
val peerF = NodeTestUtil.getBitcoindPeer(bitcoind)
val peerHandlerF: Future[PeerHandler] = peerF.flatMap { peer =>
NodeUnitTest.buildPeerHandler(peer, None)(node.nodeConfig,
node.chainConfig,
system)
}
val connectedF = for {
_ <- node.start()
peerHandler <- peerHandlerF
_ = peerHandler.peerMsgSender.connect()
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
peerHandler.p2pClient.isInitialized())
//wait until we are fully connected before continuing test
_ <- AsyncUtil.retryUntilSatisfiedF(conditionF = () =>
node.getConnectionCount.map(_ == 1))
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind)
peer <- NodeTestUtil.getBitcoindPeer(bitcoind)
_ <- bitcoind.disconnectNode(nodeUri)
_ <- AkkaUtil.nonBlockingSleep(2.seconds) //time to ensure disconnection
//now we should eventually automatically reconnect
_ <- AsyncUtil.retryUntilSatisfiedF(
conditionF = () => peerHandler.p2pClient.isConnected(),
interval = 500.millis,
maxTries = 60)
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
node.peerManager.isDisconnected(peer))
//make sure we re-connect
_ <- AsyncUtil.retryUntilSatisfiedF(conditionF = () =>
node.peerManager.isConnected(peer))
} yield succeed
connectedF

View File

@ -1,7 +1,6 @@
package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem, Cancellable}
import monix.execution.atomic.AtomicBoolean
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.p2p.ServiceIdentifier
@ -10,6 +9,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import java.net.{InetAddress, UnknownHostException}
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
@ -28,6 +28,8 @@ case class PeerFinder(
extends StartStopAsync[PeerFinder]
with P2PLogger {
private val isStarted: AtomicBoolean = new AtomicBoolean(false)
/** Returns peers by querying each dns seed once. These will be IPv4 addresses. */
private def getPeersFromDnsSeeds: Vector[Peer] = {
val dnsSeeds = nodeAppConfig.network.dnsSeeds
@ -39,7 +41,7 @@ case class PeerFinder(
} catch {
case _: UnknownHostException =>
logger.debug(s"DNS seed $seed is unavailable.")
Vector()
Vector.empty
}
})
.distinct
@ -119,17 +121,14 @@ case class PeerFinder(
private val initialDelay: FiniteDuration = 30.minute
private val isConnectionSchedulerRunning = AtomicBoolean(false)
private val isConnectionSchedulerRunning = new AtomicBoolean(false)
private lazy val peerConnectionScheduler: Cancellable =
system.scheduler.scheduleWithFixedDelay(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) { () =>
{
if (
isConnectionSchedulerRunning.compareAndSet(expect = false,
update = true)
) {
if (isConnectionSchedulerRunning.compareAndSet(false, true)) {
logger.info(s"Querying p2p network for peers...")
logger.debug(s"Cache size: ${_peerData.size}. ${_peerData.keys}")
if (_peersToTry.size < 32)
@ -156,7 +155,7 @@ case class PeerFinder(
override def start(): Future[PeerFinder] = {
logger.debug(s"Starting PeerFinder")
isStarted.set(true)
val peersToTry = (getPeersFromParam ++ getPeersFromConfig).distinct
val initPeerF = Future.traverse(peersToTry)(tryPeer)
@ -183,7 +182,19 @@ case class PeerFinder(
initPeerF.flatMap(_ => peerDiscoveryF)
}
def reconnect(peer: Peer): Future[Unit] = {
logger.info(s"Attempting to reconnect peer=$peer")
if (isStarted.get) {
tryToReconnectPeer(peer)
} else {
logger.error(
s"Ignoring reconnect attempt to peer=$peer as PeerFinder is not started")
Future.unit
}
}
override def stop(): Future[PeerFinder] = {
isStarted.set(false)
//stop scheduler
peerConnectionScheduler.cancel()
//delete try queue
@ -198,7 +209,9 @@ case class PeerFinder(
maxTries = 30)
.map(_ => this)
closeF.flatMap(_ => waitStopF)
closeF.flatMap { _ =>
waitStopF
}
}
/** creates and initialises a new test peer */
@ -207,6 +220,11 @@ case class PeerFinder(
_peerData(peer).peerMessageSender.map(_.connect())
}
private def tryToReconnectPeer(peer: Peer): Future[Unit] = {
_peerData.put(peer, PeerData(peer, node, supervisor))
_peerData(peer).peerMessageSender.map(_.reconnect())
}
def removePeer(peer: Peer): Unit = {
logger.debug(s"Removing peer $peer")
_peerData.remove(peer)

View File

@ -397,7 +397,7 @@ case class PeerManager(
stopF.failed.foreach { e =>
logger.error(
s"Failed to stop peer manager. Peers: $peers, waiting for deletion: $waitingForDeletion",
s"Failed to stop peer manager. Peers: ${_peerDataMap.map(_._1)}, waiting for deletion: $waitingForDeletion",
e)
}
@ -410,6 +410,10 @@ case class PeerManager(
else Future.successful(false)
}
def isDisconnected(peer: Peer): Future[Boolean] = {
isConnected(peer).map(b => !b)
}
def isInitialized(peer: Peer): Future[Boolean] = {
if (peerDataMap.contains(peer))
peerDataMap(peer).peerMessageSender.flatMap(_.isInitialized())
@ -486,7 +490,6 @@ case class PeerManager(
for {
_ <- sendAddrReq
peerData = finder.getData(peer).get
_ <- createInDb(peer, peerData.serviceIdentifier)
_ <- managePeerF()
} yield ()
@ -533,8 +536,7 @@ case class PeerManager(
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state}")
Future.failed(exn)
} else {
//means we are DoneSyncing, so no need to start syncing from a new peer
Future.unit
finder.reconnect(peer)
}
} else if (waitingForDeletion.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted

View File

@ -2,6 +2,7 @@ package org.bitcoins.node.networking
import akka.actor.SupervisorStrategy._
import akka.actor.{Actor, OneForOneStrategy, Props}
import akka.event.LoggingReceive
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.util.BitcoinSNodeUtil
@ -13,7 +14,7 @@ class P2PClientSupervisor extends Actor with P2PLogger {
Stop
}
def receive: Receive = { case props: Props =>
override def receive: Receive = LoggingReceive { case props: Props =>
/* actors to be supervised need to built withing this context this creates an actor using props and sends back
the ActorRef */
sender() ! context.actorOf(props,

View File

@ -30,6 +30,10 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
client.actor ! P2PClient.ConnectCommand
}
def reconnect(): Unit = {
client.actor ! P2PClient.ReconnectCommand
}
def isConnected()(implicit ec: ExecutionContext): Future[Boolean] = {
client.isConnected()
}