Add guard for calling NodeCallbackStreamManager.stop() (#4523)

* Add guard for calling NodeCallbackStreamManager.stop()

* scalafmt
This commit is contained in:
Chris Stewart 2022-07-20 10:16:26 -05:00 committed by GitHub
parent c210052640
commit f5cca7e5e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,6 +5,7 @@ import akka.actor.ActorSystem
import akka.stream.BoundedSourceQueue
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.{Logger, Logging}
import monix.execution.atomic.AtomicBoolean
import org.bitcoins.core.api.CallbackHandler
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader, MerkleBlock}
@ -112,15 +113,25 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
override def start(): Future[Unit] = Future.unit
private val isStopped: AtomicBoolean = AtomicBoolean(false)
/** Completes all streams and waits until they are fully drained */
override def stop(): Future[Unit] = {
val start = System.currentTimeMillis()
//complete all queues
filterQueue.complete()
txQueue.complete()
headerQueue.complete()
merkleBlockQueue.complete()
blockQueue.complete()
//can't complete a stream twice
if (!isStopped.get()) {
//complete all queues
filterQueue.complete()
txQueue.complete()
headerQueue.complete()
merkleBlockQueue.complete()
blockQueue.complete()
isStopped.set(true)
} else {
logger.warn(
s"Already stopped all queues associated with this NodeCallBackStreamManager")
}
for {
_ <- filterSinkCompleteF