core: Terminate rescan early when RescanStarted.stop() is called rather than wait for the rescan to complete (#5749)

Add checks if recursiveRescanP is completed before attempting to fail the Future

Fix potential deadlock in RescanStarted.stop(), add RescanState test

Add unit test to make sure we propagate exceptions correctly

Move RescanStateTest to jvm only

Link completeRescanEarlyP and blocksMatchedF together in RescanStateTest

Revert DLCWalletLoaderApi.scala
This commit is contained in:
Chris Stewart 2024-10-30 16:47:39 -05:00 committed by GitHub
parent 18c74c1351
commit 5187eee42a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 95 additions and 7 deletions

View File

@ -0,0 +1,71 @@
package org.bitcoins.core.wallet.rescan
import org.bitcoins.core.wallet.rescan.RescanState.{
RescanStarted,
RescanTerminatedEarly
}
import org.bitcoins.testkit.util.BitcoinSAsyncTest
import scala.concurrent.Promise
class RescanStateTest extends BitcoinSAsyncTest {
behavior of "RescanState"
it must "stop a rescan that has started" in {
val completeRescanEarlyP = Promise[Option[Int]]()
val recursiveRescanP = Promise[RescanState]()
val blocksMatchedF = completeRescanEarlyP.future.map(_ => Vector.empty)
val rescanState = RescanState.RescanStarted(completeRescanEarlyP,
blocksMatchedF,
recursiveRescanP)
for {
blockMatches <- rescanState.stop()
} yield assert(blockMatches.isEmpty)
}
it must "track a single rescan correctly" in {
val completeRescanEarlyP = Promise[Option[Int]]()
val recursiveRescanP = Promise[RescanState]()
val blocksMatchedF = completeRescanEarlyP.future.map(_ => Vector.empty)
val rescanState = RescanState.RescanStarted(completeRescanEarlyP,
blocksMatchedF,
recursiveRescanP)
val _ = completeRescanEarlyP.success(None)
for {
_ <- rescanState.singleRescanDoneF
_ = assert(!rescanState.entireRescanDoneF.isCompleted)
_ = recursiveRescanP.success(RescanState.RescanNotNeeded)
_ <- rescanState.entireRescanDoneF
} yield succeed
}
it must "propagate an exception if its not RescanTerminatedEarly" in {
val completeRescanEarlyP = Promise[Option[Int]]()
val completeRescanEarly1P = Promise[Option[Int]]()
val recursiveRescanP = Promise[RescanState]()
val recursiveRescan1P = Promise[RescanState]()
val blocksMatchedF = completeRescanEarlyP.future.map(_ => Vector.empty)
val blocksMatched1F = completeRescanEarly1P.future.map(_ => Vector.empty)
val rescanState =
RescanState.RescanStarted(completeRescanEarlyP = completeRescanEarlyP,
blocksMatchedF = blocksMatchedF,
recursiveRescanP = recursiveRescanP)
val rescanState1 =
RescanStarted(completeRescanEarly1P, blocksMatched1F, recursiveRescan1P)
val _ = rescanState.fail(RescanTerminatedEarly)
val resultF = for {
vec0 <- rescanState.entireRescanDoneF
_ = assert(vec0.isEmpty)
_ = rescanState1.fail(
new RuntimeException("Should fail with generic exception"))
_ <- rescanState1.stop()
} yield {
succeed
}
recoverToSucceededIf[RuntimeException](resultF)
}
}

View File

@ -48,7 +48,9 @@ object RescanState {
completeRescanEarlyP.future.failed.foreach {
case RescanTerminatedEarly =>
recursiveRescanP.failure(RescanTerminatedEarly)
if (!recursiveRescanP.isCompleted) {
recursiveRescanP.failure(RescanTerminatedEarly)
}
_isCompletedEarly.set(true)
case _: Throwable => // do nothing
}
@ -65,14 +67,16 @@ object RescanState {
* completed
*/
def singleRescanDoneF: Future[Vector[BlockMatchingResponse]] =
blocksMatchedF
blocksMatchedF.recover { case RescanTerminatedEarly =>
Vector.empty
}
/** Means the entire rescan is done (including recursive rescans). This
* future is completed when we rescan filters with addresses do not contain
* funds within [[WalletAppConfig.addressGapLimit]]
*/
def entireRescanDoneF: Future[Vector[BlockMatchingResponse]] = {
for {
val f = for {
b0 <- blocksMatchedF
recursive <- recursiveRescanP.future
b1 <- recursive match {
@ -81,22 +85,35 @@ object RescanState {
Future.successful(Vector.empty)
}
} yield b0 ++ b1
f.recover { case RescanTerminatedEarly =>
Vector.empty
}
}
/** Fails a rescan with the given exception */
def fail(err: Throwable): Unit = {
if (!recursiveRescanP.isCompleted) {
recursiveRescanP.failure(err)
}
completeRescanEarlyP.failure(err)
recursiveRescanP.failure(err)
}
/** Completes the stream that the rescan in progress uses. This aborts the
* rescan early.
*/
def stop(): Future[Vector[BlockMatchingResponse]] = {
val stoppedRecursiveRescanF = recursiveRescanP.future.flatMap {
case started: RescanStarted => started.stop()
case RescanDone | RescanAlreadyStarted | RescanNotNeeded =>
val stoppedRecursiveRescanF = {
if (recursiveRescanP.future.isCompleted) {
recursiveRescanP.future.flatMap {
case started: RescanStarted => started.stop()
case RescanDone | RescanAlreadyStarted | RescanNotNeeded =>
Future.unit
}
} else {
fail(RescanTerminatedEarly)
Future.unit
}
}
val f = stoppedRecursiveRescanF.flatMap { _ =>