Reduce reliance of PeerManager._peerDataMap in favor of NodeState.peerDataMap (#5451)

* Reduce reliance of PeerManager._peerDataMap in favor of NodeState.peerDataMap

* Fix bug where we weren't removing peers from NodeState

* Remove invariants

* Empty commit to run CI

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2024-03-05 08:16:29 -06:00 committed by GitHub
parent 0d2e0a98f0
commit 367285d9b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 42 deletions

View File

@ -31,6 +31,10 @@ sealed trait NodeRunningState extends NodeState {
def connectedPeerCount: Int = peers.size
def getPeerData(peer: Peer): Option[PersistentPeerData] = {
peerDataMap.find(_._1.peer == peer).map(_._2)
}
def getPeerConnection(peer: Peer): Option[PeerConnection] = {
peerDataMap.find(_._1.peer == peer).map(_._2.peerConnection) match {
case Some(peerConnection) => Some(peerConnection)

View File

@ -128,8 +128,6 @@ case class PeerManager(
private def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
logger.debug(s"Replacing $replacePeer with $withPeer")
require(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
for {
_ <- disconnectPeer(replacePeer)
_ <- connectPeer(withPeer)
@ -201,16 +199,18 @@ case class PeerManager(
state: NodeRunningState): Future[Unit] = {
logger.debug(s"onInitializationTimeout() peer=$peer state=$state")
val finder = state.peerFinder
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined,
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
//one of the peers that we tried, failed to init within time, disconnect
finder.getPeerData(peer).get.stop().map(_ => ())
} else if (peerDataMap.contains(peer)) {
} else if (state.getPeerData(peer).isDefined) {
//this is one of our persistent peers which must have been initialized earlier, this can happen in case of
//a reconnection attempt, meaning it got connected but failed to initialize, disconnect
peerDataMap(peer)
state
.getPeerData(peer)
.get
.stop()
.map(_ => ())
} else {
@ -276,14 +276,14 @@ case class PeerManager(
_ <- createInDb(peer, peerData.serviceIdentifier)
newState <- managePeerAfterInitialization(state, peer)
} yield {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined,
s"$peer cannot be both a test and a persistent peer")
logger.debug(
s"Initialized peer $peer with compactFilter support=$hasCf")
newState
}
} else if (peerDataMap.contains(peer)) {
} else if (state.peers.contains(peer)) {
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
//which succeeded which is all good, do nothing
state match {
@ -315,19 +315,18 @@ case class PeerManager(
forceReconnect: Boolean,
state: NodeRunningState): Future[NodeState] = {
logger.info(
s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap
.map(_._1)}")
s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect")
val finder = state.peerFinder
val _ = onDisconnectSyncFiltersJob(peer)
val updateLastSeenF = PeerDAO().updateLastSeenTime(peer)
val stateF: Future[NodeState] = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
val stateF: Future[NodeRunningState] = {
require(!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined,
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
finder.removePeer(peer)
Future.successful(state)
} else if (peerDataMap.contains(peer)) {
} else if (state.peers.contains(peer)) {
_peerDataMap.remove(peer)
onDisconnectNodeStateUpdate(state = state,
disconnectedPeer = peer,
@ -362,10 +361,10 @@ case class PeerManager(
val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
val finder = state.peerFinder
if (state.peers.exists(_ != disconnectedPeer)) {
state match {
val rm = state.removePeer(disconnectedPeer)
rm match {
case s: SyncNodeState =>
switchSyncToRandomPeer(state = s,
excludePeerOpt = Some(disconnectedPeer))
syncHelper(s).map(_ => s)
case d: DoneSyncing =>
//defensively try to sync with the new peer
//this headerSync is not safe, need to exclude peer we are disconnencting
@ -382,7 +381,7 @@ case class PeerManager(
case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer |
_: RemovePeers) =>
Future.successful(x.removePeer(disconnectedPeer))
Future.successful(x)
}
} else {
//no new peers to try to sync from, transition to done syncing?
@ -409,14 +408,12 @@ case class PeerManager(
private def onQueryTimeout(
payload: ExpectsResponse,
peer: Peer,
state: NodeState): Future[Unit] = {
state: NodeRunningState): Future[Unit] = {
logger.debug(s"Query timeout out for $peer with payload=${payload}")
//if we are removing this peer and an existing query timed out because of that
// peerData will not have this peer
if (peerDataMap.contains(peer)) {
peerDataMap(peer).updateLastFailureTime()
}
state.getPeerData(peer).map(_.updateLastFailureTime())
payload match {
case _: GetHeadersMessage =>
@ -458,10 +455,11 @@ case class PeerManager(
private def sendResponseTimeout(
peer: Peer,
payload: NetworkPayload): Future[Unit] = {
payload: NetworkPayload,
state: NodeRunningState): Future[Unit] = {
logger.debug(
s"Sending response timeout for ${payload.commandName} to $peer")
if (peerDataMap.contains(peer)) {
if (state.getPeerData(peer).isDefined) {
payload match {
case e: ExpectsResponse =>
queue
@ -584,7 +582,7 @@ case class PeerManager(
//now send request to stop actor which will be completed some time in future
val _ = _peerDataMap.remove(i.peer)
val _ = onDisconnectSyncFiltersJob(i.peer)
val newStateF =
onDisconnectNodeStateUpdate(state = running,
disconnectedPeer = i.peer,
@ -733,9 +731,17 @@ case class PeerManager(
}
case (state, q: QueryTimeout) =>
onQueryTimeout(q.payload, q.peer, state).map(_ => state)
state match {
case running: NodeRunningState =>
onQueryTimeout(q.payload, q.peer, running).map(_ => state)
}
case (state, srt: SendResponseTimeout) =>
sendResponseTimeout(srt.peer, srt.payload).map(_ => state)
state match {
case running: NodeRunningState =>
sendResponseTimeout(srt.peer, srt.payload, running).map(_ => state)
}
case (state, gossipMessage: GossipMessage) =>
state match {
case runningState: NodeRunningState =>
@ -759,8 +765,7 @@ case class PeerManager(
sender.sendMsg(msg)
case None =>
logger.warn(
s"Attempting to gossip to peer that is available in state.peers, but not peerDataMap? state=$state peerDataMap=${peerDataMap
.map(_._1)}")
s"Attempting to gossip to peer that is available in state.peers, but not peerDataMap? state=$state")
Future.unit
}
}
@ -809,21 +814,6 @@ case class PeerManager(
}
}
private def switchSyncToRandomPeer(
state: SyncNodeState,
excludePeerOpt: Option[Peer]): Future[SyncNodeState] = {
val randomPeerOpt =
state.randomPeer(excludePeers = excludePeerOpt.toSet,
ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(peer) =>
switchSyncToPeer(oldSyncState = state, newPeer = peer)
case None =>
//if we have no new peers should we just switch to DoneSyncing?
Future.successful(state)
}
}
private def sendToPeerHelper(
state: NodeRunningState,
stp: SendToPeer): Future[NodeRunningState] = {