diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index 9b1f96b5f6..e559a0623a 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -347,9 +347,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { _ <- AsyncUtil.nonBlockingSleep(1.second) initConnectionCount <- node.getConnectionCount _ = assert(initConnectionCount == 2) - nodeUri0 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0)) - peer0 <- NodeTestUtil.getBitcoindPeer(bitcoinds(0)) - _ <- bitcoinds(0).disconnectNode(nodeUri0) + bitcoind0 = bitcoinds(0) + nodeUri0 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind0) + peer0 <- NodeTestUtil.getBitcoindPeer(bitcoind0) + _ <- bitcoind0.disconnectNode(nodeUri0) _ <- AsyncUtil.retryUntilSatisfiedF( () => node.peerManager.isDisconnected(peer0), 1.second) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 870c6a28a1..0b4032f3fa 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -449,51 +449,52 @@ case class PeerManager( //client actor for one of the test peers stopped, can remove it from map now finder.removePeer(peer).map(_ => state) } else if (peerDataMap.contains(peer)) { - //actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to - //reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it + val peerData = _peerDataMap(peer) _peerDataMap.remove(peer) - //getDataMesageHandler.state is already mutated from another thread - //this will be set to the new sync peer not the old one. - val syncPeerOpt = state match { - case s: SyncNodeState => - Some(s.syncPeer) - case m: MisbehavingPeer => Some(m.badPeer) - case _: DoneSyncing | _: RemovePeers => - None - } - val shouldReconnect = - (forceReconnect || connectedPeerCount == 0) && isStarted.get - if (peers.exists(_ != peer)) { - val randomPeerOpt = randomPeerWithService( - ServiceIdentifier.NODE_COMPACT_FILTERS) - randomPeerOpt match { - case Some(peer) => - state match { - case syncState: SyncNodeState => - switchSyncToPeer(oldSyncState = syncState, newPeer = peer) - case d: DoneSyncing => - //defensively try to sync with the new peer - syncHelper(Some(peer)).map(_ => d) - case x @ (_: MisbehavingPeer | _: RemovePeers) => - Future.successful(x) - } - case None => - //if we have no new peers should we just switch to DoneSyncing? + peerData.stop().flatMap { _ => + //getDataMesageHandler.state is already mutated from another thread + //this will be set to the new sync peer not the old one. + val syncPeerOpt = state match { + case s: SyncNodeState => + Some(s.syncPeer) + case m: MisbehavingPeer => Some(m.badPeer) + case _: DoneSyncing | _: RemovePeers => + None + } + val shouldReconnect = + (forceReconnect || connectedPeerCount == 0) && isStarted.get + if (peers.exists(_ != peer)) { + val randomPeerOpt = + randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS) + randomPeerOpt match { + case Some(peer) => + state match { + case syncState: SyncNodeState => + switchSyncToPeer(oldSyncState = syncState, newPeer = peer) + case d: DoneSyncing => + //defensively try to sync with the new peer + syncHelper(Some(peer)).map(_ => d) + case x @ (_: MisbehavingPeer | _: RemovePeers) => + Future.successful(x) + } + case None => + //if we have no new peers should we just switch to DoneSyncing? + Future.successful(state) + } + } else if (syncPeerOpt.isDefined) { + if (shouldReconnect) { + finder.reconnect(peer).map(_ => state) + } else { + val exn = new RuntimeException( + s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers") + Future.failed(exn) + } + } else { + if (shouldReconnect) { + finder.reconnect(peer).map(_ => state) + } else { Future.successful(state) - } - } else if (syncPeerOpt.isDefined) { - if (shouldReconnect) { - finder.reconnect(peer).map(_ => state) - } else { - val exn = new RuntimeException( - s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers") - Future.failed(exn) - } - } else { - if (shouldReconnect) { - finder.reconnect(peer).map(_ => state) - } else { - Future.successful(state) + } } } } else if (state.waitingForDisconnection.contains(peer)) { diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala index 76e32b3dfd..2e1c7288e6 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerConnection.scala @@ -324,6 +324,7 @@ case class PeerConnection(peer: Peer, peerManager: PeerManager)(implicit case Some(cg) => logger.info(s"Disconnecting peer=${peer}") cg.stop() + connectionGraphOpt = None Future.unit case None => val err =