Improve bitcoind connection retry logic (#4386)

This commit is contained in:
rorp 2022-06-14 06:40:04 -07:00 committed by GitHub
parent fdf281b469
commit 1ad540703c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 22 deletions

View File

@ -13,6 +13,7 @@ import akka.stream.scaladsl.{
}
import akka.{Done, NotUsed}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.asyncutil.AsyncUtil.Exponential
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models._
@ -243,8 +244,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val res = infoF.map(promise.success).map(_ => true)
res.recover { case _: InWarmUp => false }
},
// retry for approximately 2 hours
mode = Exponential,
interval = 1.second,
maxTries = 100
maxTries = 12
)
info <- promise.future
} yield info
@ -481,10 +484,22 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
private def syncWalletWithBitcoindAndStartPolling(
bitcoind: BitcoindRpcClient,
wallet: Wallet): Future[Unit] = {
val f = BitcoindRpcBackendUtil
.syncWalletToBitcoind(bitcoind, wallet)
.flatMap(_ => wallet.updateUtxoPendingStates())
.flatMap { _ =>
val f = for {
_ <- AsyncUtil.retryUntilSatisfiedF(
conditionF = { () =>
for {
bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt()
} yield walletStateOpt.forall(bitcoindHeight >= _.height)
},
// retry for approximately 2 hours
mode = Exponential,
interval = 1.second,
maxTries = 12
)
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
_ <- wallet.updateUtxoPendingStates()
_ <-
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind)
@ -503,7 +518,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
bitcoindRpcConf.zmqConfig)
}
}
}
} yield ()
f.failed.foreach(err =>
logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err))

View File

@ -415,7 +415,10 @@ object BitcoindRpcBackendUtil extends Logging {
}
//don't want to execute these in parallel
val processTxFlow = Sink.foreachAsync[Transaction](1)(processTx)
val processTxFlow = Sink.foreachAsync[Option[Transaction]](1) {
case Some(tx) => processTx(tx)
case None => Future.unit
}
val res = for {
mempool <- bitcoind.getRawMemPool
@ -430,9 +433,6 @@ object BitcoindRpcBackendUtil extends Logging {
.recover { case _: Throwable =>
None
}
.collect { case Some(tx) =>
tx
}
}
.toMat(processTxFlow)(Keep.right)
.run()

View File

@ -30,6 +30,10 @@ abstract class AsyncUtil extends AsyncUtilApi {
retryUntilSatisfiedF(f, interval, maxTries)
}
sealed trait RetryMode
case object Linear extends RetryMode
case object Exponential extends RetryMode
/** The returned Future completes when condition becomes true
* @param conditionF The condition being waited on
* @param duration The interval between calls to check condition
@ -40,15 +44,23 @@ abstract class AsyncUtil extends AsyncUtilApi {
def retryUntilSatisfiedF(
conditionF: () => Future[Boolean],
interval: FiniteDuration = AsyncUtil.DEFAULT_INTERVAL,
maxTries: Int = DEFAULT_MAX_TRIES)(implicit
ec: ExecutionContext): Future[Unit] = {
maxTries: Int = DEFAULT_MAX_TRIES,
mode: RetryMode = Linear)(implicit ec: ExecutionContext): Future[Unit] = {
if (mode == Exponential) {
val millis = interval.toMillis
if (millis > 0) {
require((millis << maxTries) > 0,
s"Too many tries for retryUntilSatisfied(): $maxTries")
}
}
val stackTrace: Array[StackTraceElement] =
Thread.currentThread().getStackTrace
retryUntilSatisfiedWithCounter(conditionF = conditionF,
interval = interval,
maxTries = maxTries,
stackTrace = stackTrace)
stackTrace = stackTrace,
mode = mode)
}
// Has a different name so that default values are permitted
@ -57,8 +69,8 @@ abstract class AsyncUtil extends AsyncUtilApi {
interval: FiniteDuration,
counter: Int = 0,
maxTries: Int,
stackTrace: Array[StackTraceElement])(implicit
ec: ExecutionContext): Future[Unit] = {
stackTrace: Array[StackTraceElement],
mode: RetryMode)(implicit ec: ExecutionContext): Future[Unit] = {
conditionF().flatMap { condition =>
if (condition) {
Future.unit
@ -70,9 +82,12 @@ abstract class AsyncUtil extends AsyncUtilApi {
val p = Promise[Boolean]()
val runnable = retryRunnable(condition, p)
AsyncUtil.scheduler.scheduleOnce(interval.toMillis,
TimeUnit.MILLISECONDS,
runnable)
val delay: Long = mode match {
case Linear => interval.toMillis
case Exponential => interval.toMillis << counter
}
AsyncUtil.scheduler.scheduleOnce(delay, TimeUnit.MILLISECONDS, runnable)
p.future.flatMap {
case true => Future.unit
@ -81,7 +96,8 @@ abstract class AsyncUtil extends AsyncUtilApi {
interval = interval,
counter = counter + 1,
maxTries = maxTries,
stackTrace = stackTrace)
stackTrace = stackTrace,
mode = mode)
}
}
}

View File

@ -13,14 +13,15 @@ abstract class TestAsyncUtil extends AsyncUtil with Serializable {
duration: FiniteDuration,
counter: Int,
maxTries: Int,
stackTrace: Array[StackTraceElement])(implicit
ec: ExecutionContext): Future[Unit] = {
stackTrace: Array[StackTraceElement],
mode: RetryMode)(implicit ec: ExecutionContext): Future[Unit] = {
val retryF = super
.retryUntilSatisfiedWithCounter(conditionF,
duration,
counter,
maxTries,
stackTrace)
stackTrace,
mode)
TestAsyncUtil.transformRetryToTestFailure(retryF)
}