Replace BoundedSourceQueueWithComplete with SourceQueueWithComplete (#4576)

* Replace BoundedSourceQueueWithComplete with SourceQueueWithComplete so we can use the returned Future

* Recover the expected exceptions in CallBackUtilTest
This commit is contained in:
Chris Stewart 2022-08-05 11:57:13 -05:00 committed by GitHub
parent f286b42c71
commit 5acbba9377
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 30 deletions

View file

@ -1,5 +1,6 @@
package org.bitcoins.server
import akka.stream.StreamDetachedException
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.server.util.CallbackUtil
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
@ -8,6 +9,7 @@ import org.bitcoins.testkitcore.Implicits.GeneratorOps
import org.bitcoins.testkitcore.gen.TransactionGenerators
import org.scalatest.FutureOutcome
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class CallBackUtilTest extends BitcoinSWalletTest {
@ -46,7 +48,13 @@ class CallBackUtilTest extends BitcoinSWalletTest {
_ <- AsyncUtil.nonBlockingSleep(5000.millis)
balance2 <- wallet.getBalance()
_ <- callbacks.stop()
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2)
_ <- callbacks
.executeOnTxReceivedCallbacks(logger, tx2)
.recoverWith { case _: StreamDetachedException =>
//expect the stream to be detatched because we stopped
//the stream with callbacks.stop()
Future.unit
}
balance3 <- wallet.getBalance()
} yield {
assert(balance2 > initBalance)
@ -81,7 +89,13 @@ class CallBackUtilTest extends BitcoinSWalletTest {
_ <- AsyncUtil.nonBlockingSleep(5000.millis)
balance2 <- wallet.getBalance()
_ <- callbacks.stop()
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2)
_ <- callbacks
.executeOnTxReceivedCallbacks(logger, tx2)
.recoverWith { case _: StreamDetachedException =>
//expect the stream to be detatched because we stopped
//the stream with callbacks.stop()
Future.unit
}
balance3 <- wallet.getBalance()
} yield {
assert(balance2 > initBalance)

View file

@ -2,8 +2,8 @@ package org.bitcoins.node.callback
import akka.Done
import akka.actor.ActorSystem
import akka.stream.BoundedSourceQueue
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import grizzled.slf4j.{Logger, Logging}
import monix.execution.atomic.AtomicBoolean
import org.bitcoins.core.api.CallbackHandler
@ -24,18 +24,19 @@ import org.bitcoins.node.{
import scala.concurrent.{ExecutionContext, Future}
/** Creates a wrapper around the give node callbacks with a stream */
case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
system: ActorSystem)
case class NodeCallbackStreamManager(
callbacks: NodeCallbacks,
overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure,
maxBufferSize: Int = 1000)(implicit system: ActorSystem)
extends NodeCallbacks
with StartStopAsync[Unit]
with Logging {
import system.dispatcher
private val maxBufferSize: Int = 1000
private val filterQueueSource: Source[
Vector[(DoubleSha256Digest, GolombFilter)],
BoundedSourceQueue[Vector[(DoubleSha256Digest, GolombFilter)]]] = {
Source.queue(maxBufferSize)
SourceQueueWithComplete[Vector[(DoubleSha256Digest, GolombFilter)]]] = {
Source.queue(maxBufferSize, overflowStrategy)
}
private val filterSink: Sink[
@ -51,8 +52,8 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
private val txQueueSource: Source[
Transaction,
BoundedSourceQueue[Transaction]] = {
Source.queue(maxBufferSize)
SourceQueueWithComplete[Transaction]] = {
Source.queue(maxBufferSize, overflowStrategy)
}
private val txSink: Sink[Transaction, Future[Done]] = {
@ -66,8 +67,8 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
private val headerQueueSource: Source[
Vector[BlockHeader],
BoundedSourceQueue[Vector[BlockHeader]]] = {
Source.queue(maxBufferSize)
SourceQueueWithComplete[Vector[BlockHeader]]] = {
Source.queue(maxBufferSize, overflowStrategy)
}
private val headerSink: Sink[Vector[BlockHeader], Future[Done]] = {
@ -79,8 +80,10 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
private val (headerQueue, headerSinkCompleteF) =
matSourceAndQueue(headerQueueSource, headerSink)
private val blockQueueSource: Source[Block, BoundedSourceQueue[Block]] = {
Source.queue(maxBufferSize)
private val blockQueueSource: Source[
Block,
SourceQueueWithComplete[Block]] = {
Source.queue(maxBufferSize, overflowStrategy)
}
private val blockSink: Sink[Block, Future[Done]] = {
@ -94,8 +97,8 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
private val merkleBlockQueueSource: Source[
(MerkleBlock, Vector[Transaction]),
BoundedSourceQueue[(MerkleBlock, Vector[Transaction])]] = {
Source.queue(maxBufferSize)
SourceQueueWithComplete[(MerkleBlock, Vector[Transaction])]] = {
Source.queue(maxBufferSize, overflowStrategy)
}
private val merkleBlockSink: Sink[
@ -148,8 +151,10 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
}
private def matSourceAndQueue[T](
source: Source[T, BoundedSourceQueue[T]],
sink: Sink[T, Future[Done]]): (BoundedSourceQueue[T], Future[Done]) = {
source: Source[T, SourceQueueWithComplete[T]],
sink: Sink[T, Future[Done]]): (
SourceQueueWithComplete[T],
Future[Done]) = {
source
.toMat(sink)(Keep.both)
.run()
@ -178,38 +183,43 @@ case class NodeCallbackStreamManager(callbacks: NodeCallbacks)(implicit
override def executeOnTxReceivedCallbacks(logger: Logger, tx: Transaction)(
implicit ec: ExecutionContext): Future[Unit] = {
txQueue.offer(tx)
Future.unit
txQueue
.offer(tx)
.map(_ => ())
}
override def executeOnBlockReceivedCallbacks(logger: Logger, block: Block)(
implicit ec: ExecutionContext): Future[Unit] = {
blockQueue.offer(block)
Future.unit
blockQueue
.offer(block)
.map(_ => ())
}
override def executeOnCompactFiltersReceivedCallbacks(
logger: Logger,
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)])(implicit
ec: ExecutionContext): Future[Unit] = {
filterQueue.offer(blockFilters)
Future.unit
filterQueue
.offer(blockFilters)
.map(_ => ())
}
override def executeOnMerkleBlockReceivedCallbacks(
logger: Logger,
merkleBlock: MerkleBlock,
txs: Vector[Transaction])(implicit ec: ExecutionContext): Future[Unit] = {
merkleBlockQueue.offer((merkleBlock, txs))
Future.unit
merkleBlockQueue
.offer((merkleBlock, txs))
.map(_ => ())
}
override def executeOnBlockHeadersReceivedCallbacks(
logger: Logger,
headers: Vector[BlockHeader])(implicit
ec: ExecutionContext): Future[Unit] = {
headerQueue.offer(headers)
Future.unit
headerQueue
.offer(headers)
.map(_ => ())
}
/** Consider calling [[stop()]] before creating a new instance of callbacks

View file

@ -20,7 +20,7 @@
</encoder>
</appender>
<root level="OFF">
<root level="INFO">
<appender-ref ref="FILE"/>
<appender-ref ref="STDOUT"/>
</root>