From ab0b0e22098613b4f03037165a82363d25f15716 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Tue, 27 Feb 2024 15:38:16 -0600 Subject: [PATCH] Fix bug where we weren't removing peer from `NodeRunningState.waitingForDisconnection` (#5425) * Fix bug where we weren't removing peer from NodeRunningState.waitingForDisconnection * Empty commit to re-run CI * Remove _peerDataMap.remove(i.peer) * Empty commit to re-run CI * Replace state even if client.stop() results in a failed Future --- .../scala/org/bitcoins/node/PeerManager.scala | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 0a1f2990a3..98605adfeb 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -1,6 +1,7 @@ package org.bitcoins.node import grizzled.slf4j.Logging +import org.apache.pekko.Done import org.apache.pekko.actor.{ActorSystem, Cancellable} import org.apache.pekko.stream.scaladsl.{Sink, SourceQueue} import org.bitcoins.asyncutil.AsyncUtil @@ -334,9 +335,11 @@ case class PeerManager( Future.successful(state) } else if (peerDataMap.contains(peer)) { _peerDataMap.remove(peer) + val rm = state.waitingForDisconnection.-(peer) + val rmWaitingForDisconnect = state.replaceWaitingForDisconnection(rm) val isShuttingDown = state.isInstanceOf[NodeShuttingDown] if (state.peers.exists(_ != peer)) { - state match { + rmWaitingForDisconnect match { case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer)) case d: DoneSyncing => //defensively try to sync with the new peer @@ -355,16 +358,16 @@ case class PeerManager( } else { if (forceReconnect && !isShuttingDown) { - finder.reconnect(peer).map(_ => state) + finder.reconnect(peer).map(_ => rmWaitingForDisconnect) } else if (!isShuttingDown) { logger.info( s"No new peers to connect to, querying for new connections... state=${state} peers=$peers") finder.queryForPeerConnections(Set(peer)) match { - case Some(_) => Future.successful(state) + case Some(_) => Future.successful(rmWaitingForDisconnect) case None => logger.debug( s"Could not query for more peer connections as previous job is still running") - Future.successful(state) + Future.successful(rmWaitingForDisconnect) } } else { //if shutting down, do nothing @@ -373,7 +376,7 @@ case class PeerManager( } } else if (state.waitingForDisconnection.contains(peer)) { //a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted - val removed = state.waitingForDisconnection.removedAll(Set(peer)) + val removed = state.waitingForDisconnection.-(peer) val newState = state.replaceWaitingForDisconnection(removed) newState match { case s: SyncNodeState => @@ -592,12 +595,18 @@ case class PeerManager( //leading to a memory leak may happen //now send request to stop actor which will be completed some time in future - client.stop().map { _ => - val newWaiting = r.waitingForDisconnection.+(i.peer) - val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer) - val newState = r - .replaceWaitingForDisconnection(newWaiting) - .replacePeers(newPdm) + val _ = _peerDataMap.remove(i.peer) + val newWaiting = r.waitingForDisconnection.+(i.peer) + val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer) + val newState = r + .replaceWaitingForDisconnection(newWaiting) + .replacePeers(newPdm) + val stopF: Future[Done] = client.stop().recoverWith { + case scala.util.control.NonFatal(err) => + logger.error(s"Failed to stop peer=${client.peer}", err) + Future.successful(Done) + } + stopF.map { _ => newState } }