2024 07 29 rm tx bitcoind callbacks (#5632)

* fix docs

* Fix docs

* WIP

* Integrate BitcoindCallbacks

* Revert loback-test.xml

* Remove logs
This commit is contained in:
Chris Stewart 2024-08-03 05:16:57 -07:00 committed by GitHub
parent 38850d22e3
commit 458f3cb7d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 42 additions and 27 deletions

View File

@ -28,18 +28,19 @@ import org.bitcoins.core.api.wallet.{NeutrinoHDWalletApi, WalletApi}
import org.bitcoins.core.util.TimeUtil import org.bitcoins.core.util.TimeUtil
import org.bitcoins.dlc.node.DLCNode import org.bitcoins.dlc.node.DLCNode
import org.bitcoins.dlc.node.config.DLCNodeAppConfig import org.bitcoins.dlc.node.config.DLCNodeAppConfig
import org.bitcoins.dlc.wallet._ import org.bitcoins.dlc.wallet.*
import org.bitcoins.feeprovider.MempoolSpaceTarget.HourFeeTarget import org.bitcoins.feeprovider.MempoolSpaceTarget.HourFeeTarget
import org.bitcoins.feeprovider._ import org.bitcoins.feeprovider.*
import org.bitcoins.node.Node import org.bitcoins.node.Node
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.NodeStateDescriptorDAO import org.bitcoins.node.models.NodeStateDescriptorDAO
import org.bitcoins.rpc.BitcoindCallbacks
import org.bitcoins.rpc.BitcoindException.InWarmUp import org.bitcoins.rpc.BitcoindException.InWarmUp
import org.bitcoins.rpc.client.common.BitcoindRpcClient import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig} import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig}
import org.bitcoins.server.bitcoind.BitcoindSyncState import org.bitcoins.server.bitcoind.BitcoindSyncState
import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server} import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server}
import org.bitcoins.server.util._ import org.bitcoins.server.util.*
import org.bitcoins.tor.config.TorAppConfig import org.bitcoins.tor.config.TorAppConfig
import org.bitcoins.wallet.WalletHolder import org.bitcoins.wallet.WalletHolder
import org.bitcoins.wallet.config.WalletAppConfig import org.bitcoins.wallet.config.WalletAppConfig
@ -401,14 +402,6 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield { } yield {
WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind) WebsocketUtil.buildChainCallbacks(wsQueue, bitcoind)
} }
val nodeApiF = for {
bitcoind <- bitcoindF
chainCallbacks <- chainCallbacksF
} yield BitcoindRpcBackendUtil.buildBitcoindNodeApi(
bitcoind,
Future.successful(walletHolder),
Some(chainCallbacks)
)
val feeProviderF = bitcoindF.map { bitcoind => val feeProviderF = bitcoindF.map { bitcoind =>
FeeProviderFactory.getFeeProviderOrElse( FeeProviderFactory.getFeeProviderOrElse(
@ -423,13 +416,12 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val loadWalletApiF = { val loadWalletApiF = {
for { for {
bitcoind <- bitcoindF bitcoind <- bitcoindF
nodeApi <- nodeApiF
feeProvider <- feeProviderF feeProvider <- feeProviderF
} yield { } yield {
val l = DLCWalletBitcoindBackendLoader( val l = DLCWalletBitcoindBackendLoader(
walletHolder = walletHolder, walletHolder = walletHolder,
bitcoind = bitcoind, bitcoind = bitcoind,
nodeApi = nodeApi, nodeApi = bitcoind,
feeProvider = feeProvider feeProvider = feeProvider
) )
@ -447,6 +439,11 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
Some(walletName), Some(walletName),
conf.walletConf.aesPasswordOpt conf.walletConf.aesPasswordOpt
) )
bitcoind <- bitcoindF
walletHolder = result._1
callback = BitcoindCallbacks.onBlockReceived(
walletHolder.processBlock(_).map(_ => ()))
_ = bitcoind.bitcoindRpcAppConfig.addCallbacks(callback)
} yield result } yield result
} }
@ -629,7 +626,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
pollingCancellable <- syncF.flatMap { _ => pollingCancellable <- syncF.flatMap { _ =>
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) { if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
val blockingPollingCancellable = BitcoindRpcBackendUtil val blockingPollingCancellable = BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt) .startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt)(
nodeConf.callBacks.executeOnBlockReceivedCallbacks(_))
val mempoolCancellable = BitcoindRpcBackendUtil val mempoolCancellable = BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(wallet, bitcoind) { tx => .startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.callBacks nodeConf.callBacks

View File

@ -443,7 +443,8 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
bitcoind: BitcoindRpcClient, bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks], chainCallbacksOpt: Option[ChainCallbacks],
interval: FiniteDuration = 10.seconds interval: FiniteDuration = 10.seconds
)(implicit system: ActorSystem): Cancellable = { )(processBlock: Block => Future[Unit])(implicit
system: ActorSystem): Cancellable = {
import system.dispatcher import system.dispatcher
val processingBitcoindBlocks = new AtomicBoolean(false) val processingBitcoindBlocks = new AtomicBoolean(false)
@ -451,7 +452,6 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{ {
val isBitcoindSyncedF = isBitcoindInSync(bitcoind) val isBitcoindSyncedF = isBitcoindInSync(bitcoind)
isBitcoindSyncedF.map { isBitcoindSynced => isBitcoindSyncedF.map { isBitcoindSynced =>
if (!isBitcoindSynced) { if (!isBitcoindSynced) {
logger.info(s"Bitcoind is not synced, waiting for IBD to complete.") logger.info(s"Bitcoind is not synced, waiting for IBD to complete.")
@ -463,10 +463,10 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
if (!rescanning) { if (!rescanning) {
val pollFOptF = val pollFOptF =
pollBitcoind( pollBitcoind(
wallet = wallet,
bitcoind = bitcoind, bitcoind = bitcoind,
chainCallbacksOpt = chainCallbacksOpt, chainCallbacksOpt = chainCallbacksOpt,
prevCount = walletSyncState.height prevCount = walletSyncState.height,
processBlock = processBlock
) )
pollFOptF.flatMap { pollFOptF.flatMap {
@ -484,9 +484,9 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
f.onComplete { _ => f.onComplete { _ =>
processingBitcoindBlocks.set(false) processingBitcoindBlocks.set(false)
BitcoindRpcBackendUtil.setSyncingFlag( BitcoindRpcBackendUtil.setSyncingFlag(
false, syncing = false,
bitcoind, bitcoind = bitcoind,
chainCallbacksOpt chainCallbacksOpt = chainCallbacksOpt
) )
} // reset polling variable } // reset polling variable
f.failed.foreach(err => f.failed.foreach(err =>
@ -506,10 +506,10 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
* completed when the sync is finished. * completed when the sync is finished.
*/ */
private def pollBitcoind( private def pollBitcoind(
wallet: WalletApi,
bitcoind: BitcoindRpcClient, bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks], chainCallbacksOpt: Option[ChainCallbacks],
prevCount: Int prevCount: Int,
processBlock: Block => Future[Unit]
)(implicit system: ActorSystem): Future[Option[Future[Done]]] = { )(implicit system: ActorSystem): Future[Option[Future[Done]]] = {
import system.dispatcher import system.dispatcher
val atomicPrevCount = new AtomicInteger(prevCount) val atomicPrevCount = new AtomicInteger(prevCount)
@ -523,7 +523,7 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
val processBlockSink: Sink[(Block, GetBlockHeaderResult), Future[Done]] = { val processBlockSink: Sink[(Block, GetBlockHeaderResult), Future[Done]] = {
Sink.foreachAsync[(Block, GetBlockHeaderResult)](1) { Sink.foreachAsync[(Block, GetBlockHeaderResult)](1) {
case (block, blockHeaderResult) => case (block, blockHeaderResult) =>
val processBlocksF = wallet.processBlock(block) val processBlocksF = processBlock(block)
processBlocksF.failed.foreach { case err => processBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get val failedCount = atomicPrevCount.get

View File

@ -116,7 +116,20 @@ object CallbackUtil extends BitcoinSLogger {
.runWith(txSink) .runWith(txSink)
.map(_ => ()) .map(_ => ())
} }
val callbacks = NodeCallbacks(onTxReceived = Vector(onTx))
val blockSink = Sink.foreachAsync[Block](1) { block =>
wallet
.processBlock(block)
.map(_ => ())
}
val onBlock: OnBlockReceived = { block =>
Source
.single(block)
.runWith(blockSink)
.map(_ => ())
}
val callbacks = NodeCallbacks(onTxReceived = Vector(onTx),
onBlockReceived = Vector(onBlock))
val streamManager = NodeCallbackStreamManager(callbacks) val streamManager = NodeCallbackStreamManager(callbacks)
Future.successful(streamManager) Future.successful(streamManager)
} }

View File

@ -38,7 +38,7 @@ import scala.concurrent.Future
*/ */
class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
override val system: ActorSystem, override val system: ActorSystem,
bitcoindRpcAppConfig: BitcoindRpcAppConfig val bitcoindRpcAppConfig: BitcoindRpcAppConfig
) extends Client ) extends Client
with FeeRateApi with FeeRateApi
with NodeApi with NodeApi

View File

@ -31,6 +31,10 @@
<logger name="org.bitcoins.wallet" level="WARN"/> <logger name="org.bitcoins.wallet" level="WARN"/>
<logger name="org.bitcoins.dlc.wallet" level="WARN"/>
<logger name="org.bitcoins.server" level="WARN"/>
<!-- see what slick is compiling to in sql --> <!-- see what slick is compiling to in sql -->
<logger name="slick" level="OFF"/> <logger name="slick" level="OFF"/>

View File

@ -52,7 +52,7 @@ class BitcoindBlockPollingTest
bitcoind, bitcoind,
None, None,
1.second 1.second
) )(wallet.processBlock(_).map(_ => ()))
_ <- bitcoind.generateToAddress(6, bech32Address) _ <- bitcoind.generateToAddress(6, bech32Address)
// Wait for it to process // Wait for it to process