mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-15 12:20:06 +01:00
RpcRetryException (#274)
* Created RpcRetryException for better error messages on RpcUtil timeouts * De-duplicated RpcUtil * Ran scalafmt * Fixed an import and a version
This commit is contained in:
parent
22806bfd0d
commit
780302d92f
11 changed files with 83 additions and 306 deletions
|
@ -1,138 +0,0 @@
|
|||
package org.bitcoins
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
trait RpcUtil extends BitcoinSLogger {
|
||||
|
||||
private def retryRunnable(
|
||||
condition: => Boolean,
|
||||
p: Promise[Boolean]): Runnable = new Runnable {
|
||||
override def run(): Unit = {
|
||||
p.success(condition)
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
def retryUntilSatisfied(
|
||||
condition: => Boolean,
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Future[Unit] = {
|
||||
val f = () => Future.successful(condition)
|
||||
retryUntilSatisfiedF(f, duration, maxTries)
|
||||
}
|
||||
|
||||
/**
|
||||
* The returned Future completes when condition becomes true
|
||||
* @param conditionF The condition being waited on
|
||||
* @param duration The interval between calls to check condition
|
||||
* @param maxTries If condition is tried this many times, the Future fails
|
||||
* @param system An ActorSystem to schedule calls to condition
|
||||
* @return A Future[Unit] that succeeds if condition becomes true and fails otherwise
|
||||
*/
|
||||
def retryUntilSatisfiedF(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Future[Unit] = {
|
||||
|
||||
retryUntilSatisfiedWithCounter(conditionF = conditionF,
|
||||
duration = duration,
|
||||
maxTries = maxTries)
|
||||
}
|
||||
|
||||
// Has a different name so that default values are permitted
|
||||
private def retryUntilSatisfiedWithCounter(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration,
|
||||
counter: Int = 0,
|
||||
maxTries: Int)(implicit system: ActorSystem): Future[Unit] = {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
conditionF().flatMap { condition =>
|
||||
if (condition) {
|
||||
Future.successful(())
|
||||
} else if (counter == maxTries) {
|
||||
Future.failed(new RuntimeException("Condition timed out"))
|
||||
} else {
|
||||
|
||||
val p = Promise[Boolean]()
|
||||
val runnable = retryRunnable(condition, p)
|
||||
|
||||
system.scheduler.scheduleOnce(duration, runnable)
|
||||
|
||||
p.future.flatMap {
|
||||
case true => Future.successful(())
|
||||
case false =>
|
||||
retryUntilSatisfiedWithCounter(conditionF,
|
||||
duration,
|
||||
counter + 1,
|
||||
maxTries)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until condition becomes true, the condition
|
||||
* is checked maxTries times, or overallTimeout is reached
|
||||
* @param condition The blocking condition
|
||||
* @param duration The interval between calls to check condition
|
||||
* @param maxTries If condition is tried this many times, an exception is thrown
|
||||
* @param overallTimeout If this much time passes, an exception is thrown.
|
||||
* This exists in case calls to condition take significant time,
|
||||
* otherwise just use duration and maxTries to configure timeout.
|
||||
* @param system An ActorSystem to schedule calls to condition
|
||||
*/
|
||||
def awaitCondition(
|
||||
condition: () => Boolean,
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50,
|
||||
overallTimeout: FiniteDuration = 1.hour)(
|
||||
implicit system: ActorSystem): Unit = {
|
||||
|
||||
//type hackery here to go from () => Boolean to () => Future[Boolean]
|
||||
//to make sure we re-evaluate every time retryUntilSatisfied is called
|
||||
def conditionDef: Boolean = condition()
|
||||
val conditionF: () => Future[Boolean] = () =>
|
||||
Future.successful(conditionDef)
|
||||
|
||||
awaitConditionF(conditionF, duration, maxTries, overallTimeout)
|
||||
}
|
||||
|
||||
def awaitConditionF(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50,
|
||||
overallTimeout: FiniteDuration = 1.hour)(
|
||||
implicit system: ActorSystem): Unit = {
|
||||
|
||||
val f: Future[Unit] = retryUntilSatisfiedF(conditionF = conditionF,
|
||||
duration = duration,
|
||||
maxTries = maxTries)
|
||||
|
||||
Await.result(f, overallTimeout)
|
||||
}
|
||||
|
||||
def awaitServer(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 1.seconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
|
||||
def awaitServerShutdown(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 300.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => !server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
}
|
||||
|
||||
object RpcUtil extends RpcUtil
|
|
@ -21,11 +21,11 @@ import org.bitcoins.core.protocol.transaction.{
|
|||
}
|
||||
import org.bitcoins.core.protocol.{BitcoinAddress, P2PKHAddress}
|
||||
import org.bitcoins.core.util.{BitcoinSLogger, BitcoinSUtil}
|
||||
import org.bitcoins.rpc.RpcUtil
|
||||
import org.bitcoins.rpc.client.RpcOpts.AddressType
|
||||
import org.bitcoins.rpc.config.BitcoindInstance
|
||||
import org.bitcoins.rpc.jsonmodels._
|
||||
import org.bitcoins.rpc.serializers.JsonSerializers._
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
import play.api.libs.json._
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
package org.bitcoins.rpc
|
||||
package org.bitcoins.rpc.util
|
||||
|
||||
import java.io.PrintStream
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
trait RpcUtil extends BitcoinSLogger {
|
||||
trait AsyncUtil extends BitcoinSLogger {
|
||||
|
||||
private def retryRunnable(
|
||||
condition: => Boolean,
|
||||
|
@ -38,10 +39,37 @@ trait RpcUtil extends BitcoinSLogger {
|
|||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration = 100.millis,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Future[Unit] = {
|
||||
val stackTrace: Array[StackTraceElement] =
|
||||
Thread.currentThread().getStackTrace
|
||||
|
||||
retryUntilSatisfiedWithCounter(conditionF = conditionF,
|
||||
duration = duration,
|
||||
maxTries = maxTries)
|
||||
maxTries = maxTries,
|
||||
stackTrace = stackTrace)
|
||||
}
|
||||
|
||||
case class RpcRetryException(
|
||||
message: String,
|
||||
caller: Array[StackTraceElement])
|
||||
extends Exception(message) {
|
||||
override def printStackTrace(s: PrintStream): Unit = {
|
||||
super.printStackTrace(s)
|
||||
|
||||
val indexOfLastBitcoinSOpt = caller.reverse.zipWithIndex
|
||||
.find {
|
||||
case (element, _) =>
|
||||
element.getClassName.contains("bitcoins")
|
||||
}
|
||||
|
||||
val indexOfLastRelevantElement =
|
||||
indexOfLastBitcoinSOpt.map(_._2).getOrElse(0)
|
||||
|
||||
val relevantStackTrace =
|
||||
caller.dropRight(indexOfLastRelevantElement).drop(1)
|
||||
|
||||
s.println("Called from:")
|
||||
relevantStackTrace.foreach(element => s.println(s"\t$element"))
|
||||
}
|
||||
}
|
||||
|
||||
// Has a different name so that default values are permitted
|
||||
|
@ -49,7 +77,9 @@ trait RpcUtil extends BitcoinSLogger {
|
|||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration,
|
||||
counter: Int = 0,
|
||||
maxTries: Int)(implicit system: ActorSystem): Future[Unit] = {
|
||||
maxTries: Int,
|
||||
stackTrace: Array[StackTraceElement])(
|
||||
implicit system: ActorSystem): Future[Unit] = {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
|
@ -57,7 +87,7 @@ trait RpcUtil extends BitcoinSLogger {
|
|||
if (condition) {
|
||||
Future.successful(())
|
||||
} else if (counter == maxTries) {
|
||||
Future.failed(new RuntimeException("Condition timed out"))
|
||||
Future.failed(RpcRetryException("Condition timed out", stackTrace))
|
||||
} else {
|
||||
val p = Promise[Boolean]()
|
||||
val runnable = retryRunnable(condition, p)
|
||||
|
@ -70,7 +100,8 @@ trait RpcUtil extends BitcoinSLogger {
|
|||
retryUntilSatisfiedWithCounter(conditionF,
|
||||
duration,
|
||||
counter + 1,
|
||||
maxTries)
|
||||
maxTries,
|
||||
stackTrace)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -116,22 +147,6 @@ trait RpcUtil extends BitcoinSLogger {
|
|||
|
||||
Await.result(f, overallTimeout)
|
||||
}
|
||||
|
||||
def awaitServer(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 1.seconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
|
||||
def awaitServerShutdown(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 300.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => !server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
}
|
||||
|
||||
object RpcUtil extends RpcUtil
|
||||
object AsyncUtil extends AsyncUtil
|
27
rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala
Normal file
27
rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala
Normal file
|
@ -0,0 +1,27 @@
|
|||
package org.bitcoins.rpc.util
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
|
||||
trait RpcUtil extends AsyncUtil {
|
||||
|
||||
def awaitServer(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 1.seconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
|
||||
def awaitServerShutdown(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 300.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => !server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
}
|
||||
|
||||
object RpcUtil extends RpcUtil
|
|
@ -7,6 +7,7 @@ import akka.testkit.TestKit
|
|||
import org.bitcoins.core.currency.Bitcoins
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
import org.bitcoins.rpc.config.BitcoindInstance
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
import org.scalatest.{AsyncFlatSpec, BeforeAndAfterAll}
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.bitcoins.rpc.jsonmodels.{
|
|||
GetTransactionResult,
|
||||
RpcAddress
|
||||
}
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, BeforeAndAfterAll}
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
|
|
@ -4,7 +4,9 @@ import java.io.File
|
|||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorMaterializer
|
||||
import org.bitcoins.rpc.util.RpcUtil.RpcRetryException
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
import org.scalatest.{AsyncFlatSpec, BeforeAndAfterAll}
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
@ -44,7 +46,7 @@ class RpcUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
it should "fail if condition is false" in {
|
||||
recoverToSucceededIf[RuntimeException] {
|
||||
recoverToSucceededIf[RpcRetryException] {
|
||||
RpcUtil.retryUntilSatisfiedF(conditionF = () => Future.successful(false),
|
||||
duration = 0.millis)
|
||||
}
|
||||
|
@ -59,7 +61,7 @@ class RpcUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
|
|||
|
||||
it should "fail if there is a delay and duration is zero" in {
|
||||
val boolLater = trueLater(delay = 250)
|
||||
recoverToSucceededIf[RuntimeException] {
|
||||
recoverToSucceededIf[RpcRetryException] {
|
||||
RpcUtil.retryUntilSatisfiedF(boolLaterDoneAndTrue(boolLater),
|
||||
duration = 0.millis)
|
||||
}
|
||||
|
@ -71,7 +73,7 @@ class RpcUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
it should "timeout if condition is false" in {
|
||||
assertThrows[RuntimeException] {
|
||||
assertThrows[RpcRetryException] {
|
||||
RpcUtil.awaitCondition(condition = () => false, duration = 0.millis)
|
||||
}
|
||||
}
|
||||
|
@ -86,7 +88,7 @@ class RpcUtilTest extends AsyncFlatSpec with BeforeAndAfterAll {
|
|||
|
||||
it should "timeout if there is a delay and duration is zero" in {
|
||||
val boolLater = trueLater(delay = 250)
|
||||
assertThrows[RuntimeException] {
|
||||
assertThrows[RpcRetryException] {
|
||||
RpcUtil.awaitConditionF(boolLaterDoneAndTrue(boolLater),
|
||||
duration = 0.millis)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@ import org.bitcoins.eclair.rpc.client.EclairRpcClient
|
|||
import org.bitcoins.eclair.rpc.config.EclairInstance
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
import org.bitcoins.rpc.config.{BitcoindInstance, ZmqConfig}
|
||||
import org.bitcoins.rpc.{BitcoindRpcTestUtil, RpcUtil}
|
||||
import org.bitcoins.rpc.BitcoindRpcTestUtil
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.DurationInt
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.bitcoins.rpc.config.{
|
|||
BitcoindInstance,
|
||||
ZmqConfig
|
||||
}
|
||||
import org.bitcoins.rpc.util.RpcUtil
|
||||
|
||||
import scala.collection.immutable.Map
|
||||
import scala.collection.JavaConverters.{asScalaSet, mapAsJavaMap}
|
||||
|
|
|
@ -1,138 +0,0 @@
|
|||
package org.bitcoins
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.rpc.client.BitcoindRpcClient
|
||||
|
||||
import scala.concurrent.duration.{DurationInt, FiniteDuration}
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
trait RpcUtil extends BitcoinSLogger {
|
||||
|
||||
private def retryRunnable(
|
||||
condition: => Boolean,
|
||||
p: Promise[Boolean]): Runnable = new Runnable {
|
||||
override def run(): Unit = {
|
||||
p.success(condition)
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
def retryUntilSatisfied(
|
||||
condition: => Boolean,
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Future[Unit] = {
|
||||
val f = () => Future.successful(condition)
|
||||
retryUntilSatisfiedF(f, duration, maxTries)
|
||||
}
|
||||
|
||||
/**
|
||||
* The returned Future completes when condition becomes true
|
||||
* @param conditionF The condition being waited on
|
||||
* @param duration The interval between calls to check condition
|
||||
* @param maxTries If condition is tried this many times, the Future fails
|
||||
* @param system An ActorSystem to schedule calls to condition
|
||||
* @return A Future[Unit] that succeeds if condition becomes true and fails otherwise
|
||||
*/
|
||||
def retryUntilSatisfiedF(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Future[Unit] = {
|
||||
|
||||
retryUntilSatisfiedWithCounter(conditionF = conditionF,
|
||||
duration = duration,
|
||||
maxTries = maxTries)
|
||||
}
|
||||
|
||||
// Has a different name so that default values are permitted
|
||||
private def retryUntilSatisfiedWithCounter(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration,
|
||||
counter: Int = 0,
|
||||
maxTries: Int)(implicit system: ActorSystem): Future[Unit] = {
|
||||
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
conditionF().flatMap { condition =>
|
||||
if (condition) {
|
||||
Future.successful(())
|
||||
} else if (counter == maxTries) {
|
||||
Future.failed(new RuntimeException("Condition timed out"))
|
||||
} else {
|
||||
|
||||
val p = Promise[Boolean]()
|
||||
val runnable = retryRunnable(condition, p)
|
||||
|
||||
system.scheduler.scheduleOnce(duration, runnable)
|
||||
|
||||
p.future.flatMap {
|
||||
case true => Future.successful(())
|
||||
case false =>
|
||||
retryUntilSatisfiedWithCounter(conditionF,
|
||||
duration,
|
||||
counter + 1,
|
||||
maxTries)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until condition becomes true, the condition
|
||||
* is checked maxTries times, or overallTimeout is reached
|
||||
* @param condition The blocking condition
|
||||
* @param duration The interval between calls to check condition
|
||||
* @param maxTries If condition is tried this many times, an exception is thrown
|
||||
* @param overallTimeout If this much time passes, an exception is thrown.
|
||||
* This exists in case calls to condition take significant time,
|
||||
* otherwise just use duration and maxTries to configure timeout.
|
||||
* @param system An ActorSystem to schedule calls to condition
|
||||
*/
|
||||
def awaitCondition(
|
||||
condition: () => Boolean,
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50,
|
||||
overallTimeout: FiniteDuration = 1.hour)(
|
||||
implicit system: ActorSystem): Unit = {
|
||||
|
||||
//type hackery here to go from () => Boolean to () => Future[Boolean]
|
||||
//to make sure we re-evaluate every time retryUntilSatisfied is called
|
||||
def conditionDef: Boolean = condition()
|
||||
val conditionF: () => Future[Boolean] = () =>
|
||||
Future.successful(conditionDef)
|
||||
|
||||
awaitConditionF(conditionF, duration, maxTries, overallTimeout)
|
||||
}
|
||||
|
||||
def awaitConditionF(
|
||||
conditionF: () => Future[Boolean],
|
||||
duration: FiniteDuration = 100.milliseconds,
|
||||
maxTries: Int = 50,
|
||||
overallTimeout: FiniteDuration = 1.hour)(
|
||||
implicit system: ActorSystem): Unit = {
|
||||
|
||||
val f: Future[Unit] = retryUntilSatisfiedF(conditionF = conditionF,
|
||||
duration = duration,
|
||||
maxTries = maxTries)
|
||||
|
||||
Await.result(f, overallTimeout)
|
||||
}
|
||||
|
||||
def awaitServer(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 1.seconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
|
||||
def awaitServerShutdown(
|
||||
server: BitcoindRpcClient,
|
||||
duration: FiniteDuration = 300.milliseconds,
|
||||
maxTries: Int = 50)(implicit system: ActorSystem): Unit = {
|
||||
val f = () => !server.isStarted
|
||||
awaitCondition(f, duration, maxTries)
|
||||
}
|
||||
}
|
||||
|
||||
object RpcUtil extends RpcUtil
|
5
testkit/src/main/scala/org/bitcoins/util/AsyncUtil.scala
Normal file
5
testkit/src/main/scala/org/bitcoins/util/AsyncUtil.scala
Normal file
|
@ -0,0 +1,5 @@
|
|||
package org.bitcoins.util
|
||||
|
||||
trait AsyncUtil extends org.bitcoins.rpc.util.AsyncUtil
|
||||
|
||||
object AsyncUtil extends AsyncUtil
|
Loading…
Add table
Reference in a new issue