From f5cca7e5e1a971d8f742ae83df6956b127997ea9 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 20 Jul 2022 10:16:26 -0500 Subject: [PATCH] Add guard for calling NodeCallbackStreamManager.stop() (#4523) * Add guard for calling NodeCallbackStreamManager.stop() * scalafmt --- .../callback/NodeCallbackStreamManager.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/callback/NodeCallbackStreamManager.scala b/node/src/main/scala/org/bitcoins/node/callback/NodeCallbackStreamManager.scala index 38e5a0f23a..7ec9338e63 100644 --- a/node/src/main/scala/org/bitcoins/node/callback/NodeCallbackStreamManager.scala +++ b/node/src/main/scala/org/bitcoins/node/callback/NodeCallbackStreamManager.scala @@ -5,6 +5,7 @@ import akka.actor.ActorSystem import akka.stream.BoundedSourceQueue import akka.stream.scaladsl.{Keep, Sink, Source} import grizzled.slf4j.{Logger, Logging} +import monix.execution.atomic.AtomicBoolean import org.bitcoins.core.api.CallbackHandler import org.bitcoins.core.gcs.GolombFilter import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader, MerkleBlock} @@ -112,15 +113,25 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit override def start(): Future[Unit] = Future.unit + private val isStopped: AtomicBoolean = AtomicBoolean(false) + /** Completes all streams and waits until they are fully drained */ override def stop(): Future[Unit] = { val start = System.currentTimeMillis() - //complete all queues - filterQueue.complete() - txQueue.complete() - headerQueue.complete() - merkleBlockQueue.complete() - blockQueue.complete() + + //can't complete a stream twice + if (!isStopped.get()) { + //complete all queues + filterQueue.complete() + txQueue.complete() + headerQueue.complete() + merkleBlockQueue.complete() + blockQueue.complete() + isStopped.set(true) + } else { + logger.warn( + s"Already stopped all queues associated with this NodeCallBackStreamManager") + } for { _ <- filterSinkCompleteF