Implement ability to cancel background task for querying peer with specific services (#4937)

* Implement ability to cancel background task for querying peer with specific services

* Cancel scheduled job when promise is completed
This commit is contained in:
Chris Stewart 2023-01-02 11:39:13 -06:00 committed by GitHub
parent a7930657f9
commit 09d53460a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,6 @@
package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil
@ -17,8 +17,8 @@ import scodec.bits.ByteVector
import java.net.InetAddress
import java.time.Duration
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Random
case class PeerManager(
@ -120,26 +120,43 @@ case class PeerManager(
serviceIdentifier)
}
def awaitPeerWithService(
private var peerServicesQueries: Vector[Cancellable] = Vector.empty
private def awaitPeerWithService(
services: ServiceIdentifier,
timeout: Duration): Future[Unit] = {
logger.debug(s"Waiting for peer connection. ${_peerData.keys}")
val ret = AsyncUtil
.retryUntilSatisfied(
{
val promise = Promise[Unit]()
var counter = 0
val cancellable =
system.scheduler.scheduleAtFixedRate(0.seconds, 1.second) { () =>
if (
_peerData.exists(x => x._2.serviceIdentifier.hasServicesOf(services))
},
interval = 1.seconds,
maxTries = timeout.getSeconds.toInt)
.recover {
case _: AsyncUtil.RpcRetryException =>
throw new RuntimeException(
s"No supported peers found! Requested: ${services}")
case unknown: Throwable => throw unknown
) {
promise.success(())
} else if (counter == timeout.getSeconds.toInt) {
promise.failure(
new RuntimeException(
s"No supported peers found! Requested: ${services}"))
} else {
counter += 1
}
}
ret
peerServicesQueries = peerServicesQueries.appended(cancellable)
//remove the cancellable from the peerServicesQueries
//when our promise is completed from the scheduled job
promise.future.onComplete { _ =>
val _: Boolean = cancellable.cancel()
val idx = peerServicesQueries.indexOf(cancellable)
if (idx >= 0) {
peerServicesQueries = peerServicesQueries.zipWithIndex
.filter(_._2 != idx)
.map(_._1)
}
}
promise.future
}
def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
@ -186,6 +203,8 @@ case class PeerManager(
val finderStopF = finder.stop()
peerServicesQueries.foreach(_.cancel()) //reset the peerServicesQueries var?
val removeF = Future.sequence(peers.map(removePeer))
val managerStopF = AsyncUtil.retryUntilSatisfied(