Move PeerManager.connectPeer() logic into stream (#5340)

* Move PeerManager.connectPeer() logic into stream

* Fix comment
This commit is contained in:
Chris Stewart 2024-01-03 10:27:35 -06:00 committed by GitHub
parent d27dcb38f4
commit 30876c2cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 9 deletions

View File

@ -21,6 +21,8 @@ object NodeStreamMessage {
case class HeaderTimeoutWrapper(peer: Peer) extends NodeStreamMessage
case class ConnectPeer(peer: Peer) extends NodeStreamMessage
case class InitializeDisconnect(peer: Peer) extends NodeStreamMessage
case class DisconnectedPeer(peer: Peer, forceReconnect: Boolean)

View File

@ -52,14 +52,8 @@ case class PeerManager(
def connectedPeerCount: Int = _peerDataMap.size
override def connectPeer(peer: Peer): Future[Unit] = {
val curPeerData = finder.popFromCache(peer).get
_peerDataMap.put(peer, curPeerData)
val hasCf =
if (curPeerData.serviceIdentifier.nodeCompactFilters) "with filters"
else ""
logger.info(
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
Future.unit
val c = ConnectPeer(peer)
queue.offer(c).map(_ => ())
}
override def peers: Set[Peer] = _peerDataMap.keys.toSet
@ -270,7 +264,6 @@ case class PeerManager(
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
connectPeer(peer)
.flatMap(_ => syncHelper(peer))
} else {
val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
@ -555,6 +548,19 @@ case class PeerManager(
s"Cannot find a new peer to fulfill sync request, reverting to old state=$state")
state
}
case (state, c: ConnectPeer) =>
val peer = c.peer
val curPeerData = finder.popFromCache(peer).get
_peerDataMap.put(peer, curPeerData)
val hasCf =
if (curPeerData.serviceIdentifier.nodeCompactFilters) "with filters"
else ""
val newPeersWithSvcs =
state.peersWithServices + curPeerData.peerWithServicesOpt.get
val newState = state.replacePeers(newPeersWithSvcs)
logger.info(
s"Connected to peer $peer $hasCf. Connected peer count $connectedPeerCount")
syncHelper(c.peer).map(_ => newState)
case (state, i: InitializeDisconnect) =>
val client: PeerData = peerDataMap(i.peer)
_peerDataMap.remove(i.peer)