Refactor PeerManager methods to be connectPeer(), disconnectPeer() (#5163)

* Refactor PeerManager.{removePeer,waitingForDeletion} -> PeerManager.{disconnectPeer,waitingForDisconnection}

* PeerManager.addPeer() -> PeerManager.connectPeer()

* scalafmt
This commit is contained in:
Chris Stewart 2023-07-25 11:59:58 -05:00 committed by GitHub
parent 4afaaf8f22
commit 2d57ff6a3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 23 deletions

View File

@ -98,7 +98,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
.map(p =>
!peerManager
.getPeerData(p)
.isDefined && !peerManager.waitingForDeletion
.isDefined && !peerManager.waitingForDisconnection
.contains(p))
.forall(_ == true),
maxTries = 5,
@ -110,7 +110,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- bothOurs
_ <- allConnected
_ <- allInitialized
_ <- Future.sequence(peers.map(peerManager.removePeer))
_ <- Future.sequence(peers.map(peerManager.disconnectPeer))
_ <- allDisconn
} yield {
succeed

View File

@ -64,8 +64,8 @@ case class PeerManager(
private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty
/** holds peers removed from peerData whose client actors are not stopped yet. Used for runtime sanity checks. */
private val _waitingForDeletion: mutable.Set[Peer] = mutable.Set.empty
def waitingForDeletion: Set[Peer] = _waitingForDeletion.toSet
private val _waitingForDisconnection: mutable.Set[Peer] = mutable.Set.empty
def waitingForDisconnection: Set[Peer] = _waitingForDisconnection.toSet
private[this] var finderOpt: Option[PeerFinder] = {
None
@ -82,7 +82,7 @@ case class PeerManager(
def connectedPeerCount: Int = _peerDataMap.size
private def addPeer(peer: Peer): Future[Unit] = {
private def connectPeer(peer: Peer): Future[Unit] = {
finderOpt match {
case Some(finder) =>
require(finder.hasPeer(peer), s"Unknown $peer marked as usable")
@ -95,8 +95,9 @@ case class PeerManager(
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
Future.unit
case None =>
sys.error(
s"Cannot addPeer, finder not started. Call PeerManager.start()")
val exn = new RuntimeException(
s"Cannot connectPeer, finder not started. Call PeerManager.start()")
Future.failed(exn)
}
}
@ -335,29 +336,25 @@ case class PeerManager(
assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
for {
_ <- removePeer(replacePeer)
_ <- addPeer(withPeer)
_ <- disconnectPeer(replacePeer)
_ <- connectPeer(withPeer)
} yield {
()
}
}
def removePeer(peer: Peer): Future[Unit] = {
logger.debug(s"Removing persistent peer $peer")
def disconnectPeer(peer: Peer): Future[Unit] = {
logger.debug(s"Disconnecting persistent peer=$peer")
val client: PeerData = peerDataMap(peer)
_peerDataMap.remove(peer)
//so we need to remove if from the map for connected peers so no more request could be sent to it but we before
//the actor is stopped we don't delete it to ensure that no such case where peers is deleted but actor not stopped
//leading to a memory leak may happen
_waitingForDeletion.add(peer)
_waitingForDisconnection.add(peer)
//now send request to stop actor which will be completed some time in future
client.stop()
}
def isReconnection(peer: Peer): Boolean = {
peerDataMap.contains(peer)
}
override def start(): Future[PeerManager] = {
logger.debug(s"Starting PeerManager")
val (queue, source) = dataMessageStreamSource.preMaterialize()
@ -404,9 +401,9 @@ case class PeerManager(
val stopF = for {
_ <- finderStopF
_ <- Future.traverse(peers)(removePeer)
_ <- Future.traverse(peers)(disconnectPeer)
_ <- AsyncUtil.retryUntilSatisfied(
_peerDataMap.isEmpty && waitingForDeletion.isEmpty,
_peerDataMap.isEmpty && waitingForDisconnection.isEmpty,
interval = 1.seconds,
maxTries = 30
)
@ -503,7 +500,7 @@ case class PeerManager(
def managePeerF(): Future[Unit] = {
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
addPeer(peer)
connectPeer(peer)
} else {
lazy val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
@ -619,9 +616,9 @@ case class PeerManager(
Future.successful(state)
}
}
} else if (waitingForDeletion.contains(peer)) {
} else if (waitingForDisconnection.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
_waitingForDeletion.remove(peer)
_waitingForDisconnection.remove(peer)
Future.successful(state)
} else {
logger.warn(s"onP2PClientStopped called for unknown $peer")
@ -786,12 +783,12 @@ case class PeerManager(
case m: MisbehavingPeer =>
//disconnect the misbehaving peer
for {
_ <- removePeer(m.badPeer)
_ <- disconnectPeer(m.badPeer)
_ <- syncFromNewPeer()
} yield newDmh
case removePeers: RemovePeers =>
for {
_ <- Future.traverse(removePeers.peers)(removePeer)
_ <- Future.traverse(removePeers.peers)(disconnectPeer)
} yield newDmh
case _: SyncNodeState | DoneSyncing(_) =>
Future.successful(newDmh)