diff --git a/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala b/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala new file mode 100644 index 0000000000..80bd675aa0 --- /dev/null +++ b/app/server-test/src/test/scala/org/bitcoins/server/CallBackUtilTest.scala @@ -0,0 +1,96 @@ +package org.bitcoins.server + +import akka.stream.KillSwitches +import org.bitcoins.asyncutil.AsyncUtil +import org.bitcoins.server.util.CallbackUtil +import org.bitcoins.testkit.wallet.BitcoinSWalletTest +import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet +import org.bitcoins.testkitcore.Implicits.GeneratorOps +import org.bitcoins.testkitcore.gen.TransactionGenerators +import org.scalatest.FutureOutcome + +import scala.concurrent.duration.DurationInt + +class CallBackUtilTest extends BitcoinSWalletTest { + + behavior of "CallBackUtil" + + override type FixtureParam = FundedWallet + + override def withFixture(test: OneArgAsyncTest): FutureOutcome = + withFundedWallet(test, getBIP39PasswordOpt())(getFreshWalletAppConfig) + + it must "have the kill switch kill messages to the createBitcoindNodeCallbacksForWallet callback" in { + fundedWallet => + val wallet = fundedWallet.wallet + val addressF = wallet.getNewAddress() + val initBalanceF = wallet.getBalance() + val tx1F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + val tx2F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + + val killSwitch = KillSwitches.shared("callbackutil-test-killswitch") + val callbacksF = + CallbackUtil.createBitcoindNodeCallbacksForWallet(wallet, killSwitch) + for { + tx1 <- tx1F + tx2 <- tx2F + callbacks <- callbacksF + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + initBalance <- initBalanceF + balance2 <- wallet.getBalance() + _ = killSwitch.shutdown() + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + balance3 <- wallet.getBalance() + } yield { + assert(balance2 > initBalance) + assert(balance3 == balance2) + } + } + + it must "have the kill switch kill messages to the createNeutrinoNodeCallbacksForWallet callback" in { + fundedWallet => + val wallet = fundedWallet.wallet + val addressF = wallet.getNewAddress() + val initBalanceF = wallet.getBalance() + val tx1F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + val tx2F = addressF.map { addr => + TransactionGenerators + .transactionTo(addr.scriptPubKey) + .sampleSome + } + + val killSwitch = KillSwitches.shared("callbackutil-test2-killswitch") + val callbacksF = + CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch) + for { + tx1 <- tx1F + tx2 <- tx2F + callbacks <- callbacksF + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + initBalance <- initBalanceF + balance2 <- wallet.getBalance() + _ = killSwitch.shutdown() + _ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2) + _ <- AsyncUtil.nonBlockingSleep(1000.millis) + balance3 <- wallet.getBalance() + } yield { + assert(balance2 > initBalance) + assert(balance3 == balance2) + } + } +} diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 1c31fd9c81..0993788da5 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -1,7 +1,6 @@ package org.bitcoins.server import akka.actor.ActorSystem -import akka.stream.OverflowStrategy import akka.stream.scaladsl.{ BroadcastHub, Keep, @@ -9,6 +8,7 @@ import akka.stream.scaladsl.{ Source, SourceQueueWithComplete } +import akka.stream.{KillSwitches, OverflowStrategy, SharedKillSwitch} import akka.{Done, NotUsed} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.asyncutil.AsyncUtil.Exponential @@ -59,6 +59,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit implicit lazy val bitcoindRpcConf: BitcoindRpcAppConfig = conf.bitcoindRpcConf implicit lazy val torConf: TorAppConfig = conf.torConf + private val nodeCallbackKillSwitch: SharedKillSwitch = { + KillSwitches.shared("node-callback-killswitch") + } + override def start(): Future[Unit] = { logger.info("Starting appServer") val startTime = TimeUtil.currentEpochMs @@ -132,12 +136,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit walletConf.feeProviderTargetOpt, walletConf.torConf.socks5ProxyParams, walletConf.network) + //get our wallet val configuredWalletF = for { node <- nodeF _ = logger.info("Initialized chain api") wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider) - nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) + nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet( + wallet, + nodeCallbackKillSwitch) _ = nodeConf.addCallbacks(nodeCallbacks) } yield { logger.info( @@ -297,6 +304,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit chainQueryApi = bitcoind, feeRateApi = feeProvider) val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind) + for { _ <- isTorStartedF tmpWallet <- tmpWalletF @@ -304,8 +312,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit bitcoind, tmpWallet, Some(chainCallbacks)) + nodeCallbacks <- CallbackUtil.createBitcoindNodeCallbacksForWallet( - wallet) + wallet, + nodeCallbackKillSwitch) _ = nodeConf.addCallbacks(nodeCallbacks) _ = logger.info("Starting wallet") _ <- wallet.start() diff --git a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala index 2151a6ee72..30674e5012 100644 --- a/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/util/CallbackUtil.scala @@ -1,7 +1,13 @@ package org.bitcoins.server.util +import akka.actor.ActorSystem +import akka.stream.SharedKillSwitch +import akka.stream.scaladsl.{Sink, Source} import grizzled.slf4j.Logging -import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType} +import org.bitcoins.core.gcs.GolombFilter +import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader} +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.node.{ NodeCallbacks, OnBlockHeadersReceived, @@ -9,62 +15,108 @@ import org.bitcoins.node.{ OnCompactFiltersReceived, OnTxReceived } -import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.wallet.Wallet -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future object CallbackUtil extends Logging { - def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit - nodeConf: NodeAppConfig, - ec: ExecutionContext): Future[NodeCallbacks] = { - lazy val onTx: OnTxReceived = { tx => + def createNeutrinoNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + import system.dispatcher + //val killSwitch = KillSwitches.shared("node-callback-kill-switch") + val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction => logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") - wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) + wallet + .processTransaction(tx, blockHashOpt = None) + .map(_ => ()) + } + + val compactFilterSink = { + Sink.foreachAsync[Vector[(DoubleSha256Digest, GolombFilter)]](1) { + case blockFilters: Vector[(DoubleSha256Digest, GolombFilter)] => + logger.debug( + s"Executing onCompactFilters callback with filter count=${blockFilters.length}") + wallet + .processCompactFilters(blockFilters = blockFilters) + .map(_ => ()) + } + } + + val blockSink = { + Sink.foreachAsync[Block](1) { case block: Block => + logger.debug( + s"Executing onBlock callback=${block.blockHeader.hashBE.hex}") + wallet.processBlock(block).map(_ => ()) + } + } + + val onHeaderSink = { + Sink.foreachAsync(1) { headers: Vector[BlockHeader] => + logger.debug( + s"Executing block header with header count=${headers.length}") + if (headers.isEmpty) { + Future.unit + } else { + wallet.updateUtxoPendingStates().map(_ => ()) + } + } + } + + lazy val onTx: OnTxReceived = { tx => + Source + .single(tx) + .via(killSwitch.flow) + .runWith(txSink) + .map(_ => ()) } lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters => - logger.debug( - s"Executing onCompactFilters callback with filter count=${blockFilters.length}") - wallet - .processCompactFilters(blockFilters = blockFilters) + Source + .single(blockFilters) + .via(killSwitch.flow) + .runWith(compactFilterSink) .map(_ => ()) } lazy val onBlock: OnBlockReceived = { block => - logger.debug( - s"Executing onBlock callback=${block.blockHeader.hashBE.hex}") - wallet.processBlock(block).map(_ => ()) + Source + .single(block) + .via(killSwitch.flow) + .runWith(blockSink) + .map(_ => ()) } lazy val onHeaders: OnBlockHeadersReceived = { headers => - logger.debug( - s"Executing block header with header count=${headers.length}") - if (headers.isEmpty) { - Future.unit - } else { - wallet.updateUtxoPendingStates().map(_ => ()) - } - } - nodeConf.nodeType match { - case NodeType.NeutrinoNode => - Future.successful( - NodeCallbacks(onTxReceived = Vector(onTx), - onBlockReceived = Vector(onBlock), - onCompactFiltersReceived = Vector(onCompactFilters), - onBlockHeadersReceived = Vector(onHeaders))) - case NodeType.FullNode => - Future.failed(new RuntimeException("Not yet implemented")) - case _: ExternalImplementationNodeType => - Future.failed( - new RuntimeException( - "Cannot create callbacks for an external implementation")) + Source + .single(headers) + .via(killSwitch.flow) + .runWith(onHeaderSink) + .map(_ => ()) } + Future.successful( + NodeCallbacks(onTxReceived = Vector(onTx), + onBlockReceived = Vector(onBlock), + onCompactFiltersReceived = Vector(onCompactFilters), + onBlockHeadersReceived = Vector(onHeaders))) } - def createBitcoindNodeCallbacksForWallet(wallet: Wallet)(implicit - ec: ExecutionContext): Future[NodeCallbacks] = { - val onTx: OnTxReceived = { tx => + def createBitcoindNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + import system.dispatcher + val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction => logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback") - wallet.processTransaction(tx, blockHashOpt = None).map(_ => ()) + wallet + .processTransaction(tx, blockHashOpt = None) + .map(_ => ()) + } + val onTx: OnTxReceived = { tx => + Source + .single(tx) + .via(killSwitch.flow) + .runWith(txSink) + .map(_ => ()) } Future.successful(NodeCallbacks(onTxReceived = Vector(onTx))) } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala index a4f08515d4..335207df25 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/BitcoinSWalletTest.scala @@ -1,6 +1,7 @@ package org.bitcoins.testkit.wallet import akka.actor.ActorSystem +import akka.stream.{KillSwitches, SharedKillSwitch} import com.typesafe.config.{Config, ConfigFactory} import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.commons.config.AppConfig @@ -15,7 +16,6 @@ import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.wallet.fee._ import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet} -import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.NodeCallbacks import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient @@ -645,10 +645,11 @@ object BitcoinSWalletTest extends WalletLogger { chainQueryApi = chainQueryApi, bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system) //add callbacks for wallet + killSwitch = KillSwitches.shared("fundedWalletAndBitcoind-killswitch") nodeCallbacks <- - BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)( - config.nodeConf, - system.dispatcher) + BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet( + wallet, + killSwitch)(system) _ = config.nodeConf.addCallbacks(nodeCallbacks) withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient) funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind) @@ -693,10 +694,11 @@ object BitcoinSWalletTest extends WalletLogger { } /** Constructs callbacks for the wallet from the node to process blocks and compact filters */ - def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit - nodeAppConfig: NodeAppConfig, - ec: ExecutionContext): Future[NodeCallbacks] = { - CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet) + def createNeutrinoNodeCallbacksForWallet( + wallet: Wallet, + killSwitch: SharedKillSwitch)(implicit + system: ActorSystem): Future[NodeCallbacks] = { + CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch) } /** Makes sure our wallet is fully funded with the default amounts specified in