Refactor PeerFinder.peerConnectionScheduler() into method (#5377)

This commit is contained in:
Chris Stewart 2024-01-30 14:35:17 -06:00 committed by GitHub
parent 76e468c5c4
commit 75f6a3b4ec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -112,7 +112,9 @@ case class PeerFinder(
private val isConnectionSchedulerRunning = new AtomicBoolean(false)
private lazy val peerConnectionScheduler: Cancellable =
private[this] var peerConnectionCancellableOpt: Option[Cancellable] = None
private def peerConnectionScheduler(): Cancellable = {
system.scheduler.scheduleWithFixedDelay(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) { () =>
@ -148,41 +150,47 @@ case class PeerFinder(
}
}
}
}
override def start(): Future[PeerFinder] = {
logger.info(
s"Starting PeerFinder initialDelay=${initialDelay} paramPeers=$paramPeers")
val start = System.currentTimeMillis()
isStarted.set(true)
val peersToTry = (paramPeers ++ getPeersFromConfig).distinct
//higher priority for param peers
_peersToTry.pushAll(peersToTry, priority = 2)
if (!isStarted.get()) {
logger.info(
s"Starting PeerFinder initialDelay=${initialDelay} paramPeers=$paramPeers")
val start = System.currentTimeMillis()
isStarted.set(true)
val peersToTry = (paramPeers ++ getPeersFromConfig).distinct
//higher priority for param peers
_peersToTry.pushAll(peersToTry, priority = 2)
val peerDiscoveryF = if (nodeAppConfig.enablePeerDiscovery) {
val startedF = for {
(dbNonCf, dbCf) <- getPeersFromDb
} yield {
_peersToTry.pushAll(getPeersFromDnsSeeds)
_peersToTry.pushAll(getPeersFromResources)
_peersToTry.pushAll(dbNonCf)
_peersToTry.pushAll(dbCf, priority = 1)
peerConnectionScheduler //start scheduler
val peerDiscoveryF = if (nodeAppConfig.enablePeerDiscovery) {
val startedF = for {
(dbNonCf, dbCf) <- getPeersFromDb
} yield {
_peersToTry.pushAll(getPeersFromDnsSeeds)
_peersToTry.pushAll(getPeersFromResources)
_peersToTry.pushAll(dbNonCf)
_peersToTry.pushAll(dbCf, priority = 1)
peerConnectionCancellableOpt = Some(peerConnectionScheduler())
this
this
}
startedF
} else {
logger.info("Peer discovery disabled.")
peerConnectionCancellableOpt = Some(peerConnectionScheduler())
Future.successful(this)
}
startedF
for {
peerFinder <- peerDiscoveryF
_ = logger.info(
s"Done starting PeerFinder, it took ${System.currentTimeMillis() - start}ms")
} yield peerFinder
} else {
logger.info("Peer discovery disabled.")
peerConnectionScheduler //start scheduler
logger.warn(s"PeerFinder already started")
Future.successful(this)
}
for {
peerFinder <- peerDiscoveryF
_ = logger.info(
s"Done starting PeerFinder, it took ${System.currentTimeMillis() - start}ms")
} yield peerFinder
}
def reconnect(peer: Peer): Future[Unit] = {
@ -197,29 +205,37 @@ case class PeerFinder(
}
override def stop(): Future[PeerFinder] = {
logger.info(s"Stopping PeerFinder")
isStarted.set(false)
//stop scheduler
peerConnectionScheduler.cancel()
//delete try queue
_peersToTry.clear()
if (isStarted.get()) {
logger.info(s"Stopping PeerFinder")
isStarted.set(false)
//stop scheduler
peerConnectionCancellableOpt.map(_.cancel())
peerConnectionCancellableOpt = None
//delete try queue
_peersToTry.clear()
val stopF = for {
_ <- Future.traverse(_peerData.map(_._1))(removePeer(_))
_ <- AsyncUtil
.retryUntilSatisfied(_peerData.isEmpty,
interval = 1.seconds,
maxTries = 30)
} yield {
logger.info(s"Done stopping PeerFinder")
this
val stopF = for {
_ <- Future.traverse(_peerData.map(_._1))(removePeer(_))
_ <- AsyncUtil
.retryUntilSatisfied(_peerData.isEmpty,
interval = 1.seconds,
maxTries = 30)
} yield {
logger.info(s"Done stopping PeerFinder")
this
}
stopF.failed.foreach { e =>
logger.error(
s"Failed to stop peer finder. Peers: ${_peerData.map(_._1)}",
e)
}
stopF
} else {
logger.warn(s"PeerFinder already stopped")
Future.successful(this)
}
stopF.failed.foreach { e =>
logger.error(s"Failed to stop peer finder. Peers: ${_peerData.map(_._1)}",
e)
}
stopF
}
/** creates and initialises a new test peer */