diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 7e66b2eb0f..4d3e0c6193 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -1,6 +1,6 @@ package org.bitcoins.node -import akka.actor.{ActorRef, ActorSystem, Props} +import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} import akka.stream.OverflowStrategy import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete} import org.bitcoins.asyncutil.AsyncUtil @@ -17,8 +17,8 @@ import scodec.bits.ByteVector import java.net.InetAddress import java.time.Duration import scala.collection.mutable -import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.{DurationInt} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Random case class PeerManager( @@ -120,26 +120,43 @@ case class PeerManager( serviceIdentifier) } - def awaitPeerWithService( + private var peerServicesQueries: Vector[Cancellable] = Vector.empty + + private def awaitPeerWithService( services: ServiceIdentifier, timeout: Duration): Future[Unit] = { logger.debug(s"Waiting for peer connection. ${_peerData.keys}") - - val ret = AsyncUtil - .retryUntilSatisfied( - { + val promise = Promise[Unit]() + var counter = 0 + val cancellable = + system.scheduler.scheduleAtFixedRate(0.seconds, 1.second) { () => + if ( _peerData.exists(x => x._2.serviceIdentifier.hasServicesOf(services)) - }, - interval = 1.seconds, - maxTries = timeout.getSeconds.toInt) - .recover { - case _: AsyncUtil.RpcRetryException => - throw new RuntimeException( - s"No supported peers found! Requested: ${services}") - case unknown: Throwable => throw unknown + ) { + promise.success(()) + } else if (counter == timeout.getSeconds.toInt) { + promise.failure( + new RuntimeException( + s"No supported peers found! Requested: ${services}")) + } else { + counter += 1 + } } - ret + peerServicesQueries = peerServicesQueries.appended(cancellable) + + //remove the cancellable from the peerServicesQueries + //when our promise is completed from the scheduled job + promise.future.onComplete { _ => + val _: Boolean = cancellable.cancel() + val idx = peerServicesQueries.indexOf(cancellable) + if (idx >= 0) { + peerServicesQueries = peerServicesQueries.zipWithIndex + .filter(_._2 != idx) + .map(_._1) + } + } + promise.future } def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = { @@ -186,6 +203,8 @@ case class PeerManager( val finderStopF = finder.stop() + peerServicesQueries.foreach(_.cancel()) //reset the peerServicesQueries var? + val removeF = Future.sequence(peers.map(removePeer)) val managerStopF = AsyncUtil.retryUntilSatisfied(