Refactor peerData -> peerDataMap, try to fix #4955 (#5030)

* Refactor peerData -> peerDataMap, try to fix #4955

* scalafmt

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-04-03 06:35:39 -05:00 committed by GitHub
parent 54b47152d1
commit e791932f99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 56 deletions

View file

@ -99,7 +99,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
def allDisconn: Future[Unit] = AsyncUtil.retryUntilSatisfied(
peers
.map(p =>
!peerManager.peerData.contains(p) && !peerManager.waitingForDeletion
!peerManager.peerDataMap.contains(
p) && !peerManager.waitingForDeletion
.contains(p))
.forall(_ == true),
maxTries = 5,

View file

@ -81,7 +81,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
GetHeadersMessage(node.chainConfig.chain.genesisHash))
//waiting for response to header query now
client <- peerManager
.peerData(bitcoindPeers(0))
.peerDataMap(bitcoindPeers(0))
.peerMessageSender
.map(_.client)
_ = client.actor ! expectHeaders
@ -140,7 +140,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
node.chainConfig))
invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader))
sender <- node.peerManager.peerData(peer).peerMessageSender
sender <- node.peerManager.peerDataMap(peer).peerMessageSender
_ <- node.getDataMessageHandler.addToStream(invalidHeaderMessage,
sender,
peer)
@ -159,7 +159,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
def sendInvalidHeaders(peer: Peer): Future[Unit] = {
val invalidHeaderMessage =
HeadersMessage(headers = Vector(invalidHeader))
val senderF = node.peerManager.peerData(peer).peerMessageSender
val senderF = node.peerManager.peerDataMap(peer).peerMessageSender
for {
sender <- senderF

View file

@ -105,7 +105,7 @@ case class NeutrinoNode(
_ = logger.info(s"Syncing with $syncPeer")
_ = updateDataMessageHandler(
dataMessageHandler.copy(syncPeer = Some(syncPeer)))
peerMsgSender <- peerManager.peerData(syncPeer).peerMessageSender
peerMsgSender <- peerManager.peerDataMap(syncPeer).peerMessageSender
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
bestFilterOpt <- chainApi.getBestFilter()
@ -166,7 +166,7 @@ case class NeutrinoNode(
bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = {
val syncPeerMsgSenderOptF = {
dataMessageHandler.syncPeer.map { peer =>
peerManager.peerData(peer).peerMessageSender
peerManager.peerDataMap(peer).peerMessageSender
}
}
val sendCompactFilterHeaderMsgF = syncPeerMsgSenderOptF match {

View file

@ -76,7 +76,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* `private[node]`.
*/
def send(msg: NetworkPayload, peer: Peer): Future[Unit] = {
val senderF = peerManager.peerData(peer).peerMessageSender
val senderF = peerManager.peerDataMap(peer).peerMessageSender
senderF.flatMap(_.sendMsg(msg))
}
@ -197,7 +197,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
syncPeerOpt match {
case Some(peer) =>
peerManager
.peerData(peer)
.peerDataMap(peer)
.peerMessageSender
.flatMap(_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
blockHashes: _*))

View file

@ -30,7 +30,7 @@ case class PeerManager(
extends StartStopAsync[PeerManager]
with P2PLogger {
private val _peerData: mutable.Map[Peer, PeerData] = mutable.Map.empty
private val _peerDataMap: mutable.Map[Peer, PeerData] = mutable.Map.empty
/** holds peers removed from peerData whose client actors are not stopped yet. Used for runtime sanity checks. */
private val _waitingForDeletion: mutable.Set[Peer] = mutable.Set.empty
@ -51,12 +51,12 @@ case class PeerManager(
finder.addToTry(peers, priority)
}
def connectedPeerCount: Int = _peerData.size
def connectedPeerCount: Int = _peerDataMap.size
def addPeer(peer: Peer): Future[Unit] = {
require(finder.hasPeer(peer), s"Unknown $peer marked as usable")
val curPeerData = finder.popFromCache(peer).get
_peerData.put(peer, curPeerData)
_peerDataMap.put(peer, curPeerData)
val hasCf =
if (curPeerData.serviceIdentifier.nodeCompactFilters) "with filters"
else ""
@ -65,16 +65,16 @@ case class PeerManager(
Future.unit
}
def peers: Vector[Peer] = _peerData.keys.toVector
def peers: Vector[Peer] = _peerDataMap.keys.toVector
def peerMsgSendersF: Future[Vector[PeerMessageSender]] = {
Future
.traverse(_peerData.values)(_.peerMessageSender)
.traverse(_peerDataMap.values)(_.peerMessageSender)
.map(_.toVector)
}
def getPeerMsgSender(peer: Peer): Future[Option[PeerMessageSender]] = {
_peerData.find(_._1 == peer).map(_._2.peerMessageSender) match {
_peerDataMap.find(_._1 == peer).map(_._2.peerMessageSender) match {
case Some(peerMsgSender) => peerMsgSender.map(Some(_))
case None => Future.successful(None)
}
@ -96,13 +96,13 @@ case class PeerManager(
waitF.map { _ =>
val filteredPeers =
peerData
peerDataMap
.filter(p => p._2.serviceIdentifier.hasServicesOf(services))
.keys
.toVector
require(filteredPeers.nonEmpty)
val (good, failedRecently) =
filteredPeers.partition(p => !peerData(p).hasFailedRecently)
filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently)
if (good.nonEmpty) good(Random.nextInt(good.length))
else
@ -113,7 +113,7 @@ case class PeerManager(
def randomPeerMsgSenderWithService(
services: ServiceIdentifier): Future[PeerMessageSender] = {
val randomPeerF = randomPeerWithService(services)
randomPeerF.flatMap(peer => peerData(peer).peerMessageSender)
randomPeerF.flatMap(peer => peerDataMap(peer).peerMessageSender)
}
def createInDb(
@ -145,13 +145,14 @@ case class PeerManager(
private def awaitPeerWithService(
services: ServiceIdentifier,
timeout: Duration): Future[Unit] = {
logger.debug(s"Waiting for peer connection. ${_peerData.keys}")
logger.debug(s"Waiting for peer connection. ${_peerDataMap.keys}")
val promise = Promise[Unit]()
var counter = 0
val cancellable =
system.scheduler.scheduleAtFixedRate(0.seconds, 1.second) { () =>
if (
_peerData.exists(x => x._2.serviceIdentifier.hasServicesOf(services))
_peerDataMap.exists(x =>
x._2.serviceIdentifier.hasServicesOf(services))
) {
promise.success(())
} else if (counter == timeout.getSeconds.toInt) {
@ -181,7 +182,7 @@ case class PeerManager(
def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
logger.debug(s"Replacing $replacePeer with $withPeer")
assert(!peerData(replacePeer).serviceIdentifier.nodeCompactFilters,
assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
for {
_ <- removePeer(replacePeer)
@ -193,8 +194,8 @@ case class PeerManager(
def removePeer(peer: Peer): Future[Unit] = {
logger.debug(s"Removing persistent peer $peer")
val client: PeerData = peerData(peer)
_peerData.remove(peer)
val client: PeerData = peerDataMap(peer)
_peerDataMap.remove(peer)
//so we need to remove if from the map for connected peers so no more request could be sent to it but we before
//the actor is stopped we don't delete it to ensure that no such case where peers is deleted but actor not stopped
//leading to a memory leak may happen
@ -204,7 +205,7 @@ case class PeerManager(
}
def isReconnection(peer: Peer): Boolean = {
peerData.contains(peer)
peerDataMap.contains(peer)
}
override def start(): Future[PeerManager] = {
@ -215,7 +216,7 @@ case class PeerManager(
}
}
def peerData: Map[Peer, PeerData] = _peerData.toMap
def peerDataMap: Map[Peer, PeerData] = _peerDataMap.toMap
override def stop(): Future[PeerManager] = {
logger.info(s"Stopping PeerManager")
@ -228,7 +229,7 @@ case class PeerManager(
val removeF = Future.sequence(peers.map(removePeer))
val managerStopF = AsyncUtil.retryUntilSatisfied(
_peerData.isEmpty && waitingForDeletion.isEmpty,
_peerDataMap.isEmpty && waitingForDeletion.isEmpty,
interval = 1.seconds,
maxTries = 30)
@ -252,28 +253,28 @@ case class PeerManager(
}
def isConnected(peer: Peer): Future[Boolean] = {
if (peerData.contains(peer))
peerData(peer).peerMessageSender.flatMap(_.isConnected())
if (peerDataMap.contains(peer))
peerDataMap(peer).peerMessageSender.flatMap(_.isConnected())
else Future.successful(false)
}
def isInitialized(peer: Peer): Future[Boolean] = {
if (peerData.contains(peer))
peerData(peer).peerMessageSender.flatMap(_.isInitialized())
if (peerDataMap.contains(peer))
peerDataMap(peer).peerMessageSender.flatMap(_.isInitialized())
else Future.successful(false)
}
def onInitializationTimeout(peer: Peer): Future[Unit] = {
require(!finder.hasPeer(peer) || !peerData.contains(peer),
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
//one of the peers that we tried, failed to init within time, disconnect
finder.getData(peer).stop()
} else if (peerData.contains(peer)) {
} else if (peerDataMap.contains(peer)) {
//this is one of our persistent peers which must have been initialized earlier, this can happen in case of
//a reconnection attempt, meaning it got connected but failed to initialize, disconnect
peerData(peer).stop()
peerDataMap(peer).stop()
} else {
//this should never happen
logger.warn(s"onInitializationTimeout called for unknown $peer")
@ -282,17 +283,18 @@ case class PeerManager(
}
def onInitialization(peer: Peer): Future[Unit] = {
assert(!finder.hasPeer(peer) || !peerData.contains(peer),
s"$peer cannot be both a test and a persistent peer")
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
//this assumes neutrino and checks for compact filter support so should not be called for anything else
assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode,
s"Node cannot be ${nodeAppConfig.nodeType.shortName}")
require(nodeAppConfig.nodeType == NodeType.NeutrinoNode,
s"Node cannot be ${nodeAppConfig.nodeType.shortName}")
if (finder.hasPeer(peer)) {
//one of the peers we tries got initialized successfully
val hasCf = finder.getData(peer).serviceIdentifier.nodeCompactFilters
val peerData = finder.getData(peer)
val serviceIdentifer = peerData.serviceIdentifier
val hasCf = serviceIdentifer.nodeCompactFilters
logger.debug(s"Initialized peer $peer with $hasCf")
def sendAddrReq: Future[Unit] =
@ -303,7 +305,7 @@ case class PeerManager(
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
addPeer(peer)
} else {
lazy val notCf = peerData
lazy val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
@ -327,11 +329,12 @@ case class PeerManager(
for {
_ <- sendAddrReq
_ <- createInDb(peer, finder.getData(peer).serviceIdentifier)
peerData = finder.getData(peer)
_ <- createInDb(peer, peerData.serviceIdentifier)
_ <- managePeerF()
} yield ()
} else if (peerData.contains(peer)) {
} else if (peerDataMap.contains(peer)) {
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
//which succeeded which is all good, do nothing
Future.unit
@ -342,8 +345,8 @@ case class PeerManager(
}
def onP2PClientStopped(peer: Peer): Future[Unit] = {
assert(!finder.hasPeer(peer) || !peerData.contains(peer),
s"$peer cannot be both a test and a persistent peer")
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
logger.info(s"Client stopped for $peer")
@ -351,10 +354,10 @@ case class PeerManager(
//client actor for one of the test peers stopped, can remove it from map now
finder.removePeer(peer)
Future.unit
} else if (peerData.contains(peer)) {
} else if (peerDataMap.contains(peer)) {
//actor stopped for one of the persistent peers, can happen in case a reconnection attempt failed due to
//reconnection tries exceeding the max limit in which the client was stopped to disconnect from it, remove it
_peerData.remove(peer)
_peerDataMap.remove(peer)
val syncPeer = node.getDataMessageHandler.syncPeer
if (peers.length > 1 && syncPeer.isDefined && syncPeer.get == peer) {
syncFromNewPeer().map(_ => ())
@ -376,14 +379,14 @@ case class PeerManager(
}
def onVersionMessage(peer: Peer, versionMsg: VersionMessage): Unit = {
require(!finder.hasPeer(peer) || !peerData.contains(peer),
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
finder.getData(peer).setServiceIdentifier(versionMsg.services)
} else if (peerData.contains(peer)) {
} else if (peerDataMap.contains(peer)) {
require(
peerData(peer).serviceIdentifier.bytes == versionMsg.services.bytes)
peerDataMap(peer).serviceIdentifier.bytes == versionMsg.services.bytes)
} else {
logger.warn(s"onVersionMessage called for unknown $peer")
}
@ -394,8 +397,8 @@ case class PeerManager(
//if we are removing this peer and an existing query timed out because of that
// peerData will not have this peer
if (peerData.contains(peer)) {
peerData(peer).updateLastFailureTime()
if (peerDataMap.contains(peer)) {
peerDataMap(peer).updateLastFailureTime()
}
payload match {
@ -416,8 +419,8 @@ case class PeerManager(
def sendResponseTimeout(peer: Peer, payload: NetworkPayload): Future[Unit] = {
logger.debug(
s"Sending response timeout for ${payload.commandName} to $peer")
if (peerData.contains(peer)) {
peerData(peer).peerMessageSender.map(
if (peerDataMap.contains(peer)) {
peerDataMap(peer).peerMessageSender.map(
_.client.actor ! ResponseTimeout(payload))
} else {
logger.debug(s"Requested to send response timeout for unknown $peer")

View file

@ -346,7 +346,7 @@ case class DataMessageHandler(
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = manager.peerData
val getHeadersAllF = manager.peerDataMap
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
@ -535,10 +535,10 @@ case class DataMessageHandler(
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
state match {
case HeaderSync =>
manager.peerData(peer).updateInvalidMessageCount()
manager.peerDataMap(peer).updateInvalidMessageCount()
if (
manager
.peerData(peer)
.peerDataMap(peer)
.exceededMaxInvalidMessages && manager.peers.size > 1
) {
logger.info(
@ -586,7 +586,7 @@ case class DataMessageHandler(
ServiceIdentifier.NODE_COMPACT_FILTERS)
newDmh = currentDmh.copy(syncPeer = Some(peer))
_ = logger.info(s"Now syncing filter headers from $peer")
sender <- manager.peerData(peer).peerMessageSender
sender <- manager.peerDataMap(peer).peerMessageSender
newSyncing <- sendFirstGetCompactFilterHeadersCommand(sender)
} yield {
val syncPeerOpt = if (newSyncing) {