From 2d57ff6a3e1cb36318a692a426efe54dc648c671 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Tue, 25 Jul 2023 11:59:58 -0500 Subject: [PATCH] Refactor `PeerManager` methods to be `connectPeer()`, `disconnectPeer()` (#5163) * Refactor PeerManager.{removePeer,waitingForDeletion} -> PeerManager.{disconnectPeer,waitingForDisconnection} * PeerManager.addPeer() -> PeerManager.connectPeer() * scalafmt --- .../org/bitcoins/node/NeutrinoNodeTest.scala | 4 +- .../scala/org/bitcoins/node/PeerManager.scala | 39 +++++++++---------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index 356b72bf49..3f71e683dd 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -98,7 +98,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { .map(p => !peerManager .getPeerData(p) - .isDefined && !peerManager.waitingForDeletion + .isDefined && !peerManager.waitingForDisconnection .contains(p)) .forall(_ == true), maxTries = 5, @@ -110,7 +110,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { _ <- bothOurs _ <- allConnected _ <- allInitialized - _ <- Future.sequence(peers.map(peerManager.removePeer)) + _ <- Future.sequence(peers.map(peerManager.disconnectPeer)) _ <- allDisconn } yield { succeed diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 6da81a92eb..eff5cba1f8 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -64,8 +64,8 @@ case class PeerManager( private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty /** holds peers removed from peerData whose client actors are not stopped yet. Used for runtime sanity checks. */ - private val _waitingForDeletion: mutable.Set[Peer] = mutable.Set.empty - def waitingForDeletion: Set[Peer] = _waitingForDeletion.toSet + private val _waitingForDisconnection: mutable.Set[Peer] = mutable.Set.empty + def waitingForDisconnection: Set[Peer] = _waitingForDisconnection.toSet private[this] var finderOpt: Option[PeerFinder] = { None @@ -82,7 +82,7 @@ case class PeerManager( def connectedPeerCount: Int = _peerDataMap.size - private def addPeer(peer: Peer): Future[Unit] = { + private def connectPeer(peer: Peer): Future[Unit] = { finderOpt match { case Some(finder) => require(finder.hasPeer(peer), s"Unknown $peer marked as usable") @@ -95,8 +95,9 @@ case class PeerManager( s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount") Future.unit case None => - sys.error( - s"Cannot addPeer, finder not started. Call PeerManager.start()") + val exn = new RuntimeException( + s"Cannot connectPeer, finder not started. Call PeerManager.start()") + Future.failed(exn) } } @@ -335,29 +336,25 @@ case class PeerManager( assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters, s"$replacePeer has cf") for { - _ <- removePeer(replacePeer) - _ <- addPeer(withPeer) + _ <- disconnectPeer(replacePeer) + _ <- connectPeer(withPeer) } yield { () } } - def removePeer(peer: Peer): Future[Unit] = { - logger.debug(s"Removing persistent peer $peer") + def disconnectPeer(peer: Peer): Future[Unit] = { + logger.debug(s"Disconnecting persistent peer=$peer") val client: PeerData = peerDataMap(peer) _peerDataMap.remove(peer) //so we need to remove if from the map for connected peers so no more request could be sent to it but we before //the actor is stopped we don't delete it to ensure that no such case where peers is deleted but actor not stopped //leading to a memory leak may happen - _waitingForDeletion.add(peer) + _waitingForDisconnection.add(peer) //now send request to stop actor which will be completed some time in future client.stop() } - def isReconnection(peer: Peer): Boolean = { - peerDataMap.contains(peer) - } - override def start(): Future[PeerManager] = { logger.debug(s"Starting PeerManager") val (queue, source) = dataMessageStreamSource.preMaterialize() @@ -404,9 +401,9 @@ case class PeerManager( val stopF = for { _ <- finderStopF - _ <- Future.traverse(peers)(removePeer) + _ <- Future.traverse(peers)(disconnectPeer) _ <- AsyncUtil.retryUntilSatisfied( - _peerDataMap.isEmpty && waitingForDeletion.isEmpty, + _peerDataMap.isEmpty && waitingForDisconnection.isEmpty, interval = 1.seconds, maxTries = 30 ) @@ -503,7 +500,7 @@ case class PeerManager( def managePeerF(): Future[Unit] = { //if we have slots remaining, connect if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) { - addPeer(peer) + connectPeer(peer) } else { lazy val notCf = peerDataMap .filter(p => !p._2.serviceIdentifier.nodeCompactFilters) @@ -619,9 +616,9 @@ case class PeerManager( Future.successful(state) } } - } else if (waitingForDeletion.contains(peer)) { + } else if (waitingForDisconnection.contains(peer)) { //a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted - _waitingForDeletion.remove(peer) + _waitingForDisconnection.remove(peer) Future.successful(state) } else { logger.warn(s"onP2PClientStopped called for unknown $peer") @@ -786,12 +783,12 @@ case class PeerManager( case m: MisbehavingPeer => //disconnect the misbehaving peer for { - _ <- removePeer(m.badPeer) + _ <- disconnectPeer(m.badPeer) _ <- syncFromNewPeer() } yield newDmh case removePeers: RemovePeers => for { - _ <- Future.traverse(removePeers.peers)(removePeer) + _ <- Future.traverse(removePeers.peers)(disconnectPeer) } yield newDmh case _: SyncNodeState | DoneSyncing(_) => Future.successful(newDmh)