Fix bug where we weren't removing peer from NodeRunningState.waitingForDisconnection (#5425)

* Fix bug where we weren't removing peer from NodeRunningState.waitingForDisconnection

* Empty commit to re-run CI

* Remove _peerDataMap.remove(i.peer)

* Empty commit to re-run CI

* Replace state even if client.stop() results in a failed Future
This commit is contained in:
Chris Stewart 2024-02-27 15:38:16 -06:00 committed by GitHub
parent 0053ccd853
commit ab0b0e2209
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,6 +1,7 @@
package org.bitcoins.node package org.bitcoins.node
import grizzled.slf4j.Logging import grizzled.slf4j.Logging
import org.apache.pekko.Done
import org.apache.pekko.actor.{ActorSystem, Cancellable} import org.apache.pekko.actor.{ActorSystem, Cancellable}
import org.apache.pekko.stream.scaladsl.{Sink, SourceQueue} import org.apache.pekko.stream.scaladsl.{Sink, SourceQueue}
import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil
@ -334,9 +335,11 @@ case class PeerManager(
Future.successful(state) Future.successful(state)
} else if (peerDataMap.contains(peer)) { } else if (peerDataMap.contains(peer)) {
_peerDataMap.remove(peer) _peerDataMap.remove(peer)
val rm = state.waitingForDisconnection.-(peer)
val rmWaitingForDisconnect = state.replaceWaitingForDisconnection(rm)
val isShuttingDown = state.isInstanceOf[NodeShuttingDown] val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
if (state.peers.exists(_ != peer)) { if (state.peers.exists(_ != peer)) {
state match { rmWaitingForDisconnect match {
case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer)) case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
case d: DoneSyncing => case d: DoneSyncing =>
//defensively try to sync with the new peer //defensively try to sync with the new peer
@ -355,16 +358,16 @@ case class PeerManager(
} else { } else {
if (forceReconnect && !isShuttingDown) { if (forceReconnect && !isShuttingDown) {
finder.reconnect(peer).map(_ => state) finder.reconnect(peer).map(_ => rmWaitingForDisconnect)
} else if (!isShuttingDown) { } else if (!isShuttingDown) {
logger.info( logger.info(
s"No new peers to connect to, querying for new connections... state=${state} peers=$peers") s"No new peers to connect to, querying for new connections... state=${state} peers=$peers")
finder.queryForPeerConnections(Set(peer)) match { finder.queryForPeerConnections(Set(peer)) match {
case Some(_) => Future.successful(state) case Some(_) => Future.successful(rmWaitingForDisconnect)
case None => case None =>
logger.debug( logger.debug(
s"Could not query for more peer connections as previous job is still running") s"Could not query for more peer connections as previous job is still running")
Future.successful(state) Future.successful(rmWaitingForDisconnect)
} }
} else { } else {
//if shutting down, do nothing //if shutting down, do nothing
@ -373,7 +376,7 @@ case class PeerManager(
} }
} else if (state.waitingForDisconnection.contains(peer)) { } else if (state.waitingForDisconnection.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted //a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
val removed = state.waitingForDisconnection.removedAll(Set(peer)) val removed = state.waitingForDisconnection.-(peer)
val newState = state.replaceWaitingForDisconnection(removed) val newState = state.replaceWaitingForDisconnection(removed)
newState match { newState match {
case s: SyncNodeState => case s: SyncNodeState =>
@ -592,12 +595,18 @@ case class PeerManager(
//leading to a memory leak may happen //leading to a memory leak may happen
//now send request to stop actor which will be completed some time in future //now send request to stop actor which will be completed some time in future
client.stop().map { _ => val _ = _peerDataMap.remove(i.peer)
val newWaiting = r.waitingForDisconnection.+(i.peer) val newWaiting = r.waitingForDisconnection.+(i.peer)
val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer) val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer)
val newState = r val newState = r
.replaceWaitingForDisconnection(newWaiting) .replaceWaitingForDisconnection(newWaiting)
.replacePeers(newPdm) .replacePeers(newPdm)
val stopF: Future[Done] = client.stop().recoverWith {
case scala.util.control.NonFatal(err) =>
logger.error(s"Failed to stop peer=${client.peer}", err)
Future.successful(Done)
}
stopF.map { _ =>
newState newState
} }
} }