Reset PeerConnection.connectionGraphOpt = None when connection is disconnected (#5290)

* Reset PeerConnection.connectionGraphOpt = None when connection is disconnected

* Move resetting connectionGraphOpt into PeerConnection.disconnect()

* Call PeerData.stop() inside of PeerManager.onDisconnect()

* Empty commit to run CI

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-11-08 08:00:51 -06:00 committed by GitHub
parent c1baad9f06
commit 7b8df425fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 46 deletions

View File

@ -347,9 +347,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- AsyncUtil.nonBlockingSleep(1.second)
initConnectionCount <- node.getConnectionCount
_ = assert(initConnectionCount == 2)
nodeUri0 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0))
peer0 <- NodeTestUtil.getBitcoindPeer(bitcoinds(0))
_ <- bitcoinds(0).disconnectNode(nodeUri0)
bitcoind0 = bitcoinds(0)
nodeUri0 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind0)
peer0 <- NodeTestUtil.getBitcoindPeer(bitcoind0)
_ <- bitcoind0.disconnectNode(nodeUri0)
_ <- AsyncUtil.retryUntilSatisfiedF(
() => node.peerManager.isDisconnected(peer0),
1.second)

View File

@ -449,51 +449,52 @@ case class PeerManager(
//client actor for one of the test peers stopped, can remove it from map now
finder.removePeer(peer).map(_ => state)
} else if (peerDataMap.contains(peer)) {
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
val peerData = _peerDataMap(peer)
_peerDataMap.remove(peer)
//getDataMesageHandler.state is already mutated from another thread
//this will be set to the new sync peer not the old one.
val syncPeerOpt = state match {
case s: SyncNodeState =>
Some(s.syncPeer)
case m: MisbehavingPeer => Some(m.badPeer)
case _: DoneSyncing | _: RemovePeers =>
None
}
val shouldReconnect =
(forceReconnect || connectedPeerCount == 0) && isStarted.get
if (peers.exists(_ != peer)) {
val randomPeerOpt = randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(peer) =>
state match {
case syncState: SyncNodeState =>
switchSyncToPeer(oldSyncState = syncState, newPeer = peer)
case d: DoneSyncing =>
//defensively try to sync with the new peer
syncHelper(Some(peer)).map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
}
case None =>
//if we have no new peers should we just switch to DoneSyncing?
peerData.stop().flatMap { _ =>
//getDataMesageHandler.state is already mutated from another thread
//this will be set to the new sync peer not the old one.
val syncPeerOpt = state match {
case s: SyncNodeState =>
Some(s.syncPeer)
case m: MisbehavingPeer => Some(m.badPeer)
case _: DoneSyncing | _: RemovePeers =>
None
}
val shouldReconnect =
(forceReconnect || connectedPeerCount == 0) && isStarted.get
if (peers.exists(_ != peer)) {
val randomPeerOpt =
randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(peer) =>
state match {
case syncState: SyncNodeState =>
switchSyncToPeer(oldSyncState = syncState, newPeer = peer)
case d: DoneSyncing =>
//defensively try to sync with the new peer
syncHelper(Some(peer)).map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
}
case None =>
//if we have no new peers should we just switch to DoneSyncing?
Future.successful(state)
}
} else if (syncPeerOpt.isDefined) {
if (shouldReconnect) {
finder.reconnect(peer).map(_ => state)
} else {
val exn = new RuntimeException(
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
Future.failed(exn)
}
} else {
if (shouldReconnect) {
finder.reconnect(peer).map(_ => state)
} else {
Future.successful(state)
}
} else if (syncPeerOpt.isDefined) {
if (shouldReconnect) {
finder.reconnect(peer).map(_ => state)
} else {
val exn = new RuntimeException(
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
Future.failed(exn)
}
} else {
if (shouldReconnect) {
finder.reconnect(peer).map(_ => state)
} else {
Future.successful(state)
}
}
}
} else if (state.waitingForDisconnection.contains(peer)) {

View File

@ -324,6 +324,7 @@ case class PeerConnection(peer: Peer, peerManager: PeerManager)(implicit
case Some(cg) =>
logger.info(s"Disconnecting peer=${peer}")
cg.stop()
connectionGraphOpt = None
Future.unit
case None =>
val err =