Fix ZMQ Config with bitcoind backend (#2897)

* Fix ZMQ Config for bitcoind backend

* Fix nits
This commit is contained in:
benthecarman 2021-04-13 14:56:28 -05:00 committed by GitHub
parent be14de459e
commit b1be3347c9
6 changed files with 82 additions and 44 deletions

View file

@ -18,6 +18,7 @@ import org.bitcoins.feeprovider._
import org.bitcoins.node._
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.server.routes.{BitcoinSRunner, Server}
import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.config.WalletAppConfig
@ -147,15 +148,14 @@ class BitcoinSServerMain(override val args: Array[String])
blockCount <- bitcoind.getBlockCount
// Create callbacks for processing new blocks
_ = bitcoindRpcConf.zmqPortOpt match {
case Some(_) =>
val zmq = BitcoindRpcBackendUtil.createZMQWalletCallbacks(wallet)
zmq.start()
case None =>
_ =
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
bitcoind,
blockCount)
}
} else {
BitcoindRpcBackendUtil.startZMQWalletCallbacks(wallet)
}
binding <- startHttpServer(nodeApi = bitcoind,
chainApi = bitcoind,

View file

@ -1,9 +1,5 @@
package org.bitcoins.server
import java.io.File
import java.net.URI
import java.nio.file._
import akka.actor.ActorSystem
import com.typesafe.config.Config
import org.bitcoins.db._
@ -12,6 +8,9 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config._
import java.io.File
import java.net.{InetSocketAddress, URI}
import java.nio.file._
import scala.concurrent.{ExecutionContext, Future}
/** Configuration for a BitcoindRpcClient
@ -91,13 +90,32 @@ case class BitcoindRpcAppConfig(
lazy val authCredentials: BitcoindAuthCredentials =
BitcoindAuthCredentials.PasswordBased(rpcUser, rpcPassword)
lazy val zmqPortOpt: Option[Int] =
config.getIntOpt("bitcoin-s.bitcoind-rpc.zmqport")
lazy val zmqRawBlock: Option[InetSocketAddress] =
config.getStringOrNone("bitcoin-s.bitcoind-rpc.zmqpubrawblock").map { str =>
val uri = URI.create(str)
new InetSocketAddress(uri.getHost, uri.getPort)
}
lazy val zmqConfig: ZmqConfig = zmqPortOpt match {
case Some(zmqPort) => ZmqConfig.fromPort(zmqPort)
case None => ZmqConfig()
}
lazy val zmqRawTx: Option[InetSocketAddress] =
config.getStringOrNone("bitcoin-s.bitcoind-rpc.zmqpubrawtx").map { str =>
val uri = URI.create(str)
new InetSocketAddress(uri.getHost, uri.getPort)
}
lazy val zmqHashBlock: Option[InetSocketAddress] =
config.getStringOrNone("bitcoin-s.bitcoind-rpc.zmqpubashblock").map { str =>
val uri = URI.create(str)
new InetSocketAddress(uri.getHost, uri.getPort)
}
lazy val zmqHashTx: Option[InetSocketAddress] =
config.getStringOrNone("bitcoin-s.bitcoind-rpc.zmqpubashtx").map { str =>
val uri = URI.create(str)
new InetSocketAddress(uri.getHost, uri.getPort)
}
lazy val zmqConfig: ZmqConfig =
ZmqConfig(zmqHashBlock, zmqRawBlock, zmqHashTx, zmqRawTx)
lazy val bitcoindInstance: BitcoindInstance =
BitcoindInstance(network = network,

View file

@ -1,18 +1,17 @@
package org.bitcoins.server
import akka.actor.{ActorSystem, Cancellable}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import grizzled.slf4j.Logging
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
@ -105,34 +104,43 @@ object BitcoindRpcBackendUtil extends Logging {
pairedWallet
}
def createZMQWalletCallbacks(wallet: Wallet)(implicit
bitcoindRpcConf: BitcoindRpcAppConfig): ZMQSubscriber = {
require(bitcoindRpcConf.zmqPortOpt.isDefined,
"Must have the zmq port defined to setup ZMQ callbacks")
val zmqSocket =
new InetSocketAddress("tcp://127.0.0.1", bitcoindRpcConf.zmqPortOpt.get)
def startZMQWalletCallbacks(wallet: Wallet)(implicit
bitcoindRpcConf: BitcoindRpcAppConfig): Unit = {
require(bitcoindRpcConf.zmqConfig != ZmqConfig.empty,
"Must have the zmq raw configs defined to setup ZMQ callbacks")
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
logger.debug(s"Received tx ${tx.txIdBE}, processing")
wallet.processTransaction(tx, None)
()
bitcoindRpcConf.zmqRawTx.foreach { zmq =>
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
logger.debug(s"Received tx ${tx.txIdBE.hex}, processing")
wallet.processTransaction(tx, None)
()
}
}
new ZMQSubscriber(socket = zmq,
hashTxListener = None,
hashBlockListener = None,
rawTxListener = rawTxListener,
rawBlockListener = None).start()
}
val rawBlockListener: Option[Block => Unit] = Some {
{ block: Block =>
logger.debug(s"Received block ${block.blockHeader.hashBE}, processing")
wallet.processBlock(block)
()
bitcoindRpcConf.zmqRawBlock.foreach { zmq =>
val rawBlockListener: Option[Block => Unit] = Some {
{ block: Block =>
logger.debug(
s"Received block ${block.blockHeader.hashBE.hex}, processing")
wallet.processBlock(block)
()
}
}
}
new ZMQSubscriber(socket = zmqSocket,
hashTxListener = None,
hashBlockListener = None,
rawTxListener = rawTxListener,
rawBlockListener = rawBlockListener)
new ZMQSubscriber(socket = zmq,
hashTxListener = None,
hashBlockListener = None,
rawTxListener = None,
rawBlockListener = rawBlockListener).start()
}
}
private def getNodeApiWalletCallback(

View file

@ -13,6 +13,8 @@ sealed trait ZmqConfig {
object ZmqConfig extends Logging {
val empty: ZmqConfig = ZmqConfig()
private case class ZmqConfigImpl(
hashBlock: Option[InetSocketAddress],
rawBlock: Option[InetSocketAddress],

View file

@ -144,6 +144,14 @@ bitcoin-s {
rpcport = 8332
# bitcoind zmq port for all services
zmqport = 29000
# bitcoind zmq raw tx
zmqpubrawtx = "tcp://127.0.0.1:28332"
# bitcoind zmq raw block
zmqpubrawblock = "tcp://127.0.0.1:28333"
# bitcoind zmq hash tx
zmqpubhashtx = "tcp://127.0.0.1:28330"
# bitcoind zmq raw block
zmqpubhashblock = "tcp://127.0.0.1:28331"
}
node {

View file

@ -153,8 +153,10 @@ bitcoin-s {
rpcbind = localhost
# bitcoind rpc port
rpcport = 8332
# bitcoind zmq port for all services
zmqport = 29000
# bitcoind zmq raw tx
zmqpubrawtx = "tcp://127.0.0.1:28332"
# bitcoind zmq raw block
zmqpubrawblock = "tcp://127.0.0.1:28333"
}
```
@ -175,4 +177,4 @@ $ unzip chaindb-testnet-2021-02-03.zip
$ mv chaindb.sqlite ~/.bitcoin-s/testnet/
```
This should take a couple minutes to execute, but once it is done, you will only have a short while left to sync once you start your server.
This should take a couple minutes to execute, but once it is done, you will only have a short while left to sync once you start your server.