2022 07 17 node callback akka streams (#4516)

* Implement akka stream proxy for nodecallbacks

Add killswitch to createBitcoindNodeCallbacksForWallet

Add unit test for killswitch in createBitcoindNodeCallbacksForWallet

Add delays to make sure callbacks are executed

Fix rebase

Move killswitch out of methods into class level val

* Bump timeout
This commit is contained in:
Chris Stewart 2022-07-19 13:45:41 -05:00 committed by GitHub
parent e3a7c0971f
commit 0cb93ddbf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 51 deletions

View file

@ -0,0 +1,96 @@
package org.bitcoins.server
import akka.stream.KillSwitches
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.server.util.CallbackUtil
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet
import org.bitcoins.testkitcore.Implicits.GeneratorOps
import org.bitcoins.testkitcore.gen.TransactionGenerators
import org.scalatest.FutureOutcome
import scala.concurrent.duration.DurationInt
class CallBackUtilTest extends BitcoinSWalletTest {
behavior of "CallBackUtil"
override type FixtureParam = FundedWallet
override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withFundedWallet(test, getBIP39PasswordOpt())(getFreshWalletAppConfig)
it must "have the kill switch kill messages to the createBitcoindNodeCallbacksForWallet callback" in {
fundedWallet =>
val wallet = fundedWallet.wallet
val addressF = wallet.getNewAddress()
val initBalanceF = wallet.getBalance()
val tx1F = addressF.map { addr =>
TransactionGenerators
.transactionTo(addr.scriptPubKey)
.sampleSome
}
val tx2F = addressF.map { addr =>
TransactionGenerators
.transactionTo(addr.scriptPubKey)
.sampleSome
}
val killSwitch = KillSwitches.shared("callbackutil-test-killswitch")
val callbacksF =
CallbackUtil.createBitcoindNodeCallbacksForWallet(wallet, killSwitch)
for {
tx1 <- tx1F
tx2 <- tx2F
callbacks <- callbacksF
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1)
_ <- AsyncUtil.nonBlockingSleep(1000.millis)
initBalance <- initBalanceF
balance2 <- wallet.getBalance()
_ = killSwitch.shutdown()
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2)
_ <- AsyncUtil.nonBlockingSleep(1000.millis)
balance3 <- wallet.getBalance()
} yield {
assert(balance2 > initBalance)
assert(balance3 == balance2)
}
}
it must "have the kill switch kill messages to the createNeutrinoNodeCallbacksForWallet callback" in {
fundedWallet =>
val wallet = fundedWallet.wallet
val addressF = wallet.getNewAddress()
val initBalanceF = wallet.getBalance()
val tx1F = addressF.map { addr =>
TransactionGenerators
.transactionTo(addr.scriptPubKey)
.sampleSome
}
val tx2F = addressF.map { addr =>
TransactionGenerators
.transactionTo(addr.scriptPubKey)
.sampleSome
}
val killSwitch = KillSwitches.shared("callbackutil-test2-killswitch")
val callbacksF =
CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch)
for {
tx1 <- tx1F
tx2 <- tx2F
callbacks <- callbacksF
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx1)
_ <- AsyncUtil.nonBlockingSleep(1000.millis)
initBalance <- initBalanceF
balance2 <- wallet.getBalance()
_ = killSwitch.shutdown()
_ <- callbacks.executeOnTxReceivedCallbacks(logger, tx2)
_ <- AsyncUtil.nonBlockingSleep(1000.millis)
balance3 <- wallet.getBalance()
} yield {
assert(balance2 > initBalance)
assert(balance3 == balance2)
}
}
}

View file

@ -1,7 +1,6 @@
package org.bitcoins.server
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{
BroadcastHub,
Keep,
@ -9,6 +8,7 @@ import akka.stream.scaladsl.{
Source,
SourceQueueWithComplete
}
import akka.stream.{KillSwitches, OverflowStrategy, SharedKillSwitch}
import akka.{Done, NotUsed}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.asyncutil.AsyncUtil.Exponential
@ -59,6 +59,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
implicit lazy val bitcoindRpcConf: BitcoindRpcAppConfig = conf.bitcoindRpcConf
implicit lazy val torConf: TorAppConfig = conf.torConf
private val nodeCallbackKillSwitch: SharedKillSwitch = {
KillSwitches.shared("node-callback-killswitch")
}
override def start(): Future[Unit] = {
logger.info("Starting appServer")
val startTime = TimeUtil.currentEpochMs
@ -132,12 +136,15 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
walletConf.feeProviderTargetOpt,
walletConf.torConf.socks5ProxyParams,
walletConf.network)
//get our wallet
val configuredWalletF = for {
node <- nodeF
_ = logger.info("Initialized chain api")
wallet <- dlcConf.createDLCWallet(node, chainApi, feeProvider)
nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
nodeCallbacks <- CallbackUtil.createNeutrinoNodeCallbacksForWallet(
wallet,
nodeCallbackKillSwitch)
_ = nodeConf.addCallbacks(nodeCallbacks)
} yield {
logger.info(
@ -297,6 +304,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
chainQueryApi = bitcoind,
feeRateApi = feeProvider)
val chainCallbacks = WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
for {
_ <- isTorStartedF
tmpWallet <- tmpWalletF
@ -304,8 +312,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
bitcoind,
tmpWallet,
Some(chainCallbacks))
nodeCallbacks <- CallbackUtil.createBitcoindNodeCallbacksForWallet(
wallet)
wallet,
nodeCallbackKillSwitch)
_ = nodeConf.addCallbacks(nodeCallbacks)
_ = logger.info("Starting wallet")
_ <- wallet.start()

View file

@ -1,7 +1,13 @@
package org.bitcoins.server.util
import akka.actor.ActorSystem
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.{Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.{ExternalImplementationNodeType, NodeType}
import org.bitcoins.core.gcs.GolombFilter
import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.node.{
NodeCallbacks,
OnBlockHeadersReceived,
@ -9,33 +15,46 @@ import org.bitcoins.node.{
OnCompactFiltersReceived,
OnTxReceived
}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.wallet.Wallet
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
object CallbackUtil extends Logging {
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
nodeConf: NodeAppConfig,
ec: ExecutionContext): Future[NodeCallbacks] = {
lazy val onTx: OnTxReceived = { tx =>
def createNeutrinoNodeCallbacksForWallet(
wallet: Wallet,
killSwitch: SharedKillSwitch)(implicit
system: ActorSystem): Future[NodeCallbacks] = {
import system.dispatcher
//val killSwitch = KillSwitches.shared("node-callback-kill-switch")
val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction =>
logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
wallet
.processTransaction(tx, blockHashOpt = None)
.map(_ => ())
}
lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
val compactFilterSink = {
Sink.foreachAsync[Vector[(DoubleSha256Digest, GolombFilter)]](1) {
case blockFilters: Vector[(DoubleSha256Digest, GolombFilter)] =>
logger.debug(
s"Executing onCompactFilters callback with filter count=${blockFilters.length}")
wallet
.processCompactFilters(blockFilters = blockFilters)
.map(_ => ())
}
lazy val onBlock: OnBlockReceived = { block =>
}
val blockSink = {
Sink.foreachAsync[Block](1) { case block: Block =>
logger.debug(
s"Executing onBlock callback=${block.blockHeader.hashBE.hex}")
wallet.processBlock(block).map(_ => ())
}
lazy val onHeaders: OnBlockHeadersReceived = { headers =>
}
val onHeaderSink = {
Sink.foreachAsync(1) { headers: Vector[BlockHeader] =>
logger.debug(
s"Executing block header with header count=${headers.length}")
if (headers.isEmpty) {
@ -44,27 +63,60 @@ object CallbackUtil extends Logging {
wallet.updateUtxoPendingStates().map(_ => ())
}
}
nodeConf.nodeType match {
case NodeType.NeutrinoNode =>
}
lazy val onTx: OnTxReceived = { tx =>
Source
.single(tx)
.via(killSwitch.flow)
.runWith(txSink)
.map(_ => ())
}
lazy val onCompactFilters: OnCompactFiltersReceived = { blockFilters =>
Source
.single(blockFilters)
.via(killSwitch.flow)
.runWith(compactFilterSink)
.map(_ => ())
}
lazy val onBlock: OnBlockReceived = { block =>
Source
.single(block)
.via(killSwitch.flow)
.runWith(blockSink)
.map(_ => ())
}
lazy val onHeaders: OnBlockHeadersReceived = { headers =>
Source
.single(headers)
.via(killSwitch.flow)
.runWith(onHeaderSink)
.map(_ => ())
}
Future.successful(
NodeCallbacks(onTxReceived = Vector(onTx),
onBlockReceived = Vector(onBlock),
onCompactFiltersReceived = Vector(onCompactFilters),
onBlockHeadersReceived = Vector(onHeaders)))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not yet implemented"))
case _: ExternalImplementationNodeType =>
Future.failed(
new RuntimeException(
"Cannot create callbacks for an external implementation"))
}
}
def createBitcoindNodeCallbacksForWallet(wallet: Wallet)(implicit
ec: ExecutionContext): Future[NodeCallbacks] = {
val onTx: OnTxReceived = { tx =>
def createBitcoindNodeCallbacksForWallet(
wallet: Wallet,
killSwitch: SharedKillSwitch)(implicit
system: ActorSystem): Future[NodeCallbacks] = {
import system.dispatcher
val txSink = Sink.foreachAsync[Transaction](1) { case tx: Transaction =>
logger.debug(s"Receiving transaction txid=${tx.txIdBE.hex} as a callback")
wallet.processTransaction(tx, blockHashOpt = None).map(_ => ())
wallet
.processTransaction(tx, blockHashOpt = None)
.map(_ => ())
}
val onTx: OnTxReceived = { tx =>
Source
.single(tx)
.via(killSwitch.flow)
.runWith(txSink)
.map(_ => ())
}
Future.successful(NodeCallbacks(onTxReceived = Vector(onTx)))
}

View file

@ -1,6 +1,7 @@
package org.bitcoins.testkit.wallet
import akka.actor.ActorSystem
import akka.stream.{KillSwitches, SharedKillSwitch}
import com.typesafe.config.{Config, ConfigFactory}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.commons.config.AppConfig
@ -15,7 +16,6 @@ import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.fee._
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.dlc.wallet.{DLCAppConfig, DLCWallet}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.NodeCallbacks
import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion}
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
@ -645,10 +645,11 @@ object BitcoinSWalletTest extends WalletLogger {
chainQueryApi = chainQueryApi,
bip39PasswordOpt = bip39PasswordOpt)(config.walletConf, system)
//add callbacks for wallet
killSwitch = KillSwitches.shared("fundedWalletAndBitcoind-killswitch")
nodeCallbacks <-
BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(wallet)(
config.nodeConf,
system.dispatcher)
BitcoinSWalletTest.createNeutrinoNodeCallbacksForWallet(
wallet,
killSwitch)(system)
_ = config.nodeConf.addCallbacks(nodeCallbacks)
withBitcoind <- createWalletWithBitcoind(wallet, bitcoindRpcClient)
funded <- FundWalletUtil.fundWalletWithBitcoind(withBitcoind)
@ -693,10 +694,11 @@ object BitcoinSWalletTest extends WalletLogger {
}
/** Constructs callbacks for the wallet from the node to process blocks and compact filters */
def createNeutrinoNodeCallbacksForWallet(wallet: Wallet)(implicit
nodeAppConfig: NodeAppConfig,
ec: ExecutionContext): Future[NodeCallbacks] = {
CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet)
def createNeutrinoNodeCallbacksForWallet(
wallet: Wallet,
killSwitch: SharedKillSwitch)(implicit
system: ActorSystem): Future[NodeCallbacks] = {
CallbackUtil.createNeutrinoNodeCallbacksForWallet(wallet, killSwitch)
}
/** Makes sure our wallet is fully funded with the default amounts specified in