Make PeerMessageSender.reconnect() return a Future that is completed when the connection is established (#5155)

* Make PeerMessageSender.reconnect() return a Future that is completed when the connection is established

* Replace hard require() with AsyncUtil.retryUntilSatisfiedF()
This commit is contained in:
Chris Stewart 2023-07-19 13:10:15 -05:00 committed by GitHub
parent 44190a535c
commit e08469901b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 7 deletions

View File

@ -36,7 +36,7 @@ import scodec.bits.ByteVector
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.time.Instant import java.time.Instant
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import scala.concurrent.Future import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.duration.{DurationInt, FiniteDuration}
case class PeerMessageSender( case class PeerMessageSender(
@ -299,14 +299,20 @@ case class PeerMessageSender(
curReconnectionTry += 1 curReconnectionTry += 1
reconnectionTry = reconnectionTry + 1 reconnectionTry = reconnectionTry + 1
val reconnectP = Promise[Unit]()
val cancellable = system.scheduler.scheduleOnce(delay) { val cancellable = system.scheduler.scheduleOnce(delay) {
val connF = connect() val connF = connect()
connF.failed.foreach(err => connF.onComplete {
logger.error(s"Failed to reconnect with peer=$peer", err)) case scala.util.Success(_) =>
() resetReconnect()
reconnectP.success(())
case scala.util.Failure(exception) =>
logger.error(s"Failed to reconnect with peer=$peer", exception)
reconnectP.failure(exception)
}
} }
reconnectionCancellableOpt = Some(cancellable) reconnectionCancellableOpt = Some(cancellable)
Future.unit reconnectP.future
} }
} }

View File

@ -482,8 +482,8 @@ object NodeUnitTest extends P2PLogger {
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4976 //see: https://github.com/bitcoin-s/bitcoin-s/issues/4976
_ <- bitcoind.syncWithValidationInterfaceQueue() _ <- bitcoind.syncWithValidationInterfaceQueue()
_ <- node.sync() _ <- node.sync()
syncing <- node.chainApiFromDb().flatMap(_.isSyncing()) _ <- AsyncUtil.retryUntilSatisfiedF(() =>
_ = require(syncing) node.chainApiFromDb().flatMap(_.isSyncing()))
} yield node } yield node
} }