From 0cb93ddbf48da42a804f032b7ed1821863ea23c1 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Tue, 19 Jul 2022 13:45:41 -0500 Subject: [PATCH] 2022 07 17 node callback akka streams (#4516) * Implement akka stream proxy for nodecallbacks Add killswitch to createBitcoindNodeCallbacksForWallet Add unit test for killswitch in createBitcoindNodeCallbacksForWallet Add delays to make sure callbacks are executed Fix rebase Move killswitch out of methods into class level val * Bump timeout --- .../bitcoins/server/CallBackUtilTest.scala | 96 +++++++++++++ .../bitcoins/server/BitcoinSServerMain.scala | 16 ++- .../bitcoins/server/util/CallbackUtil.scala | 132 ++++++++++++------ .../testkit/wallet/BitcoinSWalletTest.scala | 18 +-- 4 files changed, 211 insertions(+), 51 deletions(-) create mode 100644 app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala diff --git a/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala b/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala new file mode 100644 index 0000000000..80bd675aa0 --- /dev/null +++ b/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala @@ -0,0 +1,96 @@ +package org.bitcoins.server + +import akka.stream.KillSwitches +import org.bitcoins.asyncutil.AsyncUtil +import org.bitcoins.server.util.CallbackUtil +import org.bitcoins.testkit.wallet.BitcoinSWalletTest +import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet +import org.bitcoins.testkitcore.Implicits.GeneratorOps +import org.bitcoins.testkitcore.gen.TransactionGenerators +import org.scalatest.FutureOutcome + +import scala.concurrent.duration.DurationInt + +class CallBackUtilTest extends BitcoinSWalletTest { + + behavior of "CallBackUtil" + + override type FixtureParam = FundedWallet + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = + withFundedWallet(test, getBIP39PasswordOpt())(getFreshWalletAppConfig) + + it must "have the kill switch kill messages to the createBitcoindNodeCallbacksForWallet callback" in { + fundedWallet => + val wallet = fundedWallet.wallet + val addressF = wallet.getNewAddress() + val initBalanceF = wallet.getBalance() + val tx1F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + val tx2F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + + val killSwitch = KillSwitches.shared("callbackutil-test-killswitch") + val callbacksF = + CallbackUtil.createBitcoindNodeCallbacksForWallet(wallet, killSwitch) + for { + tx1 <- tx1F + tx2 <- tx2F + callbacks <- callbacksF + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + initBalance <- initBalanceF + balance2 <- wallet.getBalance() + _ = killSwitch.shutdown() + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + balance3 <- wallet.getBalance() + } yield { + assert(balance2 > initBalance) + assert(balance3 == balance2) + } + } + + it must "have the kill switch kill messages to the createNeutrinoNodeCallbacksForWallet callback" in { + fundedWallet => + val wallet = fundedWallet.wallet + val addressF = wallet.getNewAddress() + val initBalanceF = wallet.getBalance() + val tx1F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + val tx2F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + + val killSwitch = KillSwitches.shared("callbackutil-test2-killswitch") + val callbacksF = + CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch) + for { + tx1 <- tx1F + tx2 <- tx2F + callbacks <- callbacksF + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + initBalance <- initBalanceF + balance2 <- wallet.getBalance() + _ = killSwitch.shutdown() + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + balance3 <- wallet.getBalance() + } yield { + assert(balance2 > initBalance) + assert(balance3 == balance2) + } + } +} diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 1c31fd9c81..0993788da5 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -1,7 +1,6 @@ package org.bitcoins.server import akka.actor.ActorSystem -import akka.stream.OverflowStrategy import akka.stream.scaladsl.{ BroadcastHub, Keep, @@ -9,6 +8,7 @@ import akka.stream.scaladsl.{ Source, SourceQueueWithComplete } +import akka.stream.{KillSwitches, OverflowStrategy, SharedKillSwitch} import akka.{Done, NotUsed} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil.Exponential @@ -59,6 +59,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit implicit lazy val bitcoindRpcConf: BitcoindRpcAppConfig = conf.bitcoindRpcConf implicit lazy val torConf: TorAppConfig = conf.torConf + private val nodeCallbackKillSwitch: SharedKillSwitch = { + KillSwitches.shared("node-callback-killswitch") + } + override def start(): Future[Unit] = { logger.info("Starting appServer") val startTime = TimeUtil.currentEpochMs @@ -132,12 +136,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit walletConf.feeProviderTargetOpt, walletConf.torConf.socks5ProxyParams, walletConf.network) + //get our wallet val configuredWalletF = for { node <- nodeF _ = logger.info("Initialized chain api") wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider) - nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) + nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet( + wallet, + nodeCallbackKillSwitch) _ = nodeConf.addCallbacks(nodeCallbacks) } yield { logger.info( @@ -297,6 +304,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit chainQueryApi = bitcoind, feeRateApi = feeProvider) val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind) + for { _ <- isTorStartedF tmpWallet <- tmpWalletF @@ -304,8 +312,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit bitcoind, tmpWallet, Some(chainCallbacks)) + nodeCallbacks <- CallbackUtil.createBitcoindNodeCallbacksForWallet( - wallet) + wallet, + nodeCallbackKillSwitch) _ = nodeConf.addCallbacks(nodeCallbacks) _ = logger.info("Starting wallet") _ <- wallet.start() diff --git a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala index 2151a6ee72..30674e5012 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala @@ -1,7 +1,13 @@ package org.bitcoins.server.util +import akka.actor.ActorSystem +import akka.stream.SharedKillSwitch +import akka.stream.scaladsl.{Sink, Source} import grizzled.slf4j.Logging -import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType} +import org.bitcoins.core.gcs.GolombFilter +import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.node.{ NodeCallbacks, OnBlockHeadersReceived, @@ -9,62 +15,108 @@ import org.bitcoins.node.{ OnCompactFiltersReceived, OnTxReceived } -import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.wallet.Wallet -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future object CallbackUtil extends Logging { - def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit - nodeConf: NodeAppConfig, - ec: ExecutionContext): Future[NodeCallbacks] = { - lazy val onTx: OnTxReceived = { tx => + def createNeutrinoNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + import system.dispatcher + //val killSwitch = KillSwitches.shared("node-callback-kill-switch") + val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction => logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") - wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) + wallet + .processTransaction(tx, blockHashOpt = None) + .map(_ => ()) + } + + val compactFilterSink = { + Sink.foreachAsync[Vector[(DoubleSha256Digest, GolombFilter)]](1) { + case blockFilters: Vector[(DoubleSha256Digest, GolombFilter)] => + logger.debug( + s"Executing onCompactFilters callback with filter count=${blockFilters.length}") + wallet + .processCompactFilters(blockFilters = blockFilters) + .map(_ => ()) + } + } + + val blockSink = { + Sink.foreachAsync[Block](1) { case block: Block => + logger.debug( + s"Executing onBlock callback=${block.blockHeader.hashBE.hex}") + wallet.processBlock(block).map(_ => ()) + } + } + + val onHeaderSink = { + Sink.foreachAsync(1) { headers: Vector[BlockHeader] => + logger.debug( + s"Executing block header with header count=${headers.length}") + if (headers.isEmpty) { + Future.unit + } else { + wallet.updateUtxoPendingStates().map(_ => ()) + } + } + } + + lazy val onTx: OnTxReceived = { tx => + Source + .single(tx) + .via(killSwitch.flow) + .runWith(txSink) + .map(_ => ()) } lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters => - logger.debug( - s"Executing onCompactFilters callback with filter count=${blockFilters.length}") - wallet - .processCompactFilters(blockFilters = blockFilters) + Source + .single(blockFilters) + .via(killSwitch.flow) + .runWith(compactFilterSink) .map(_ => ()) } lazy val onBlock: OnBlockReceived = { block => - logger.debug( - s"Executing onBlock callback=${block.blockHeader.hashBE.hex}") - wallet.processBlock(block).map(_ => ()) + Source + .single(block) + .via(killSwitch.flow) + .runWith(blockSink) + .map(_ => ()) } lazy val onHeaders: OnBlockHeadersReceived = { headers => - logger.debug( - s"Executing block header with header count=${headers.length}") - if (headers.isEmpty) { - Future.unit - } else { - wallet.updateUtxoPendingStates().map(_ => ()) - } - } - nodeConf.nodeType match { - case NodeType.NeutrinoNode => - Future.successful( - NodeCallbacks(onTxReceived = Vector(onTx), - onBlockReceived = Vector(onBlock), - onCompactFiltersReceived = Vector(onCompactFilters), - onBlockHeadersReceived = Vector(onHeaders))) - case NodeType.FullNode => - Future.failed(new RuntimeException("Not yet implemented")) - case _: ExternalImplementationNodeType => - Future.failed( - new RuntimeException( - "Cannot create callbacks for an external implementation")) + Source + .single(headers) + .via(killSwitch.flow) + .runWith(onHeaderSink) + .map(_ => ()) } + Future.successful( + NodeCallbacks(onTxReceived = Vector(onTx), + onBlockReceived = Vector(onBlock), + onCompactFiltersReceived = Vector(onCompactFilters), + onBlockHeadersReceived = Vector(onHeaders))) } - def createBitcoindNodeCallbacksForWallet(wallet: Wallet)(implicit - ec: ExecutionContext): Future[NodeCallbacks] = { - val onTx: OnTxReceived = { tx => + def createBitcoindNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + import system.dispatcher + val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction => logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") - wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) + wallet + .processTransaction(tx, blockHashOpt = None) + .map(_ => ()) + } + val onTx: OnTxReceived = { tx => + Source + .single(tx) + .via(killSwitch.flow) + .runWith(txSink) + .map(_ => ()) } Future.successful(NodeCallbacks(onTxReceived = Vector(onTx))) } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala index a4f08515d4..335207df25 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala @@ -1,6 +1,7 @@ package org.bitcoins.testkit.wallet import akka.actor.ActorSystem +import akka.stream.{KillSwitches, SharedKillSwitch} import com.typesafe.config.{Config, ConfigFactory} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.commons.config.AppConfig @@ -15,7 +16,6 @@ import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.fee._ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet} -import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.NodeCallbacks import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient @@ -645,10 +645,11 @@ object BitcoinSWalletTest extends WalletLogger { chainQueryApi = chainQueryApi, bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system) //add callbacks for wallet + killSwitch = KillSwitches.shared("fundedWalletAndBitcoind-killswitch") nodeCallbacks <- - BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)( - config.nodeConf, - system.dispatcher) + BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet( + wallet, + killSwitch)(system) _ = config.nodeConf.addCallbacks(nodeCallbacks) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind) @@ -693,10 +694,11 @@ object BitcoinSWalletTest extends WalletLogger { } /** Constructs callbacks for the wallet from the node to process blocks and compact filters */ - def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit - nodeAppConfig: NodeAppConfig, - ec: ExecutionContext): Future[NodeCallbacks] = { - CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) + def createNeutrinoNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch) } /** Makes sure our wallet is fully funded with the default amounts specified in