Add WalletZmqSubscribers (#5423)

Move connection logic into ZMQSubscriber.start() rather than SubscriberRunnable
This commit is contained in:
Chris Stewart 2024-02-26 11:28:39 -06:00 committed by GitHub
parent dae6ebf9aa
commit cb26f6de07
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 105 additions and 60 deletions

View file

@ -221,13 +221,15 @@ object BitcoindRpcBackendUtil extends Logging {
def startZMQWalletCallbacks(
wallet: NeutrinoHDWalletApi,
zmqConfig: ZmqConfig)(implicit ec: ExecutionContext): Unit = {
zmqConfig: ZmqConfig)(implicit
ec: ExecutionContext): WalletZmqSubscribers = {
require(zmqConfig != ZmqConfig.empty,
"Must have the zmq raw configs defined to setup ZMQ callbacks")
zmqConfig.rawTx.foreach { zmq =>
val rawTxSub = zmqConfig.rawTx.map { zmq =>
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
println(s"Received tx ${tx.txIdBE.hex}, processing")
logger.debug(s"Received tx ${tx.txIdBE.hex}, processing")
val f = wallet.processTransaction(tx, None)
f.failed.foreach { err =>
@ -241,14 +243,15 @@ object BitcoindRpcBackendUtil extends Logging {
hashTxListener = None,
hashBlockListener = None,
rawTxListener = rawTxListener,
rawBlockListener = None).start()
rawBlockListener = None)
}
zmqConfig.rawBlock.foreach { zmq =>
val rawBlockSub = zmqConfig.rawBlock.map { zmq =>
val rawBlockListener: Option[Block => Unit] = Some {
{ block: Block =>
logger.info(
s"Received block ${block.blockHeader.hashBE.hex}, processing")
println(s"Received block ${block.blockHeader.hashBE.hex}, processing")
val f = wallet.processBlock(block)
f.failed.foreach { err =>
logger.error("failed to process raw block zmq message", err)
@ -261,8 +264,12 @@ object BitcoindRpcBackendUtil extends Logging {
hashTxListener = None,
hashBlockListener = None,
rawTxListener = None,
rawBlockListener = rawBlockListener).start()
rawBlockListener = rawBlockListener)
}
val subs = WalletZmqSubscribers(rawTxSub, rawBlockSub)
subs.start()
subs
}
def createDLCWalletWithBitcoindCallbacks(

View file

@ -0,0 +1,34 @@
package org.bitcoins.server
import org.bitcoins.core.util.StartStop
import org.bitcoins.zmq.ZMQSubscriber
import java.util.concurrent.atomic.AtomicBoolean
case class WalletZmqSubscribers(
rawTxSubscriberOpt: Option[ZMQSubscriber],
rawBlockSubscriberOpt: Option[ZMQSubscriber])
extends StartStop[Unit] {
private val isStarted: AtomicBoolean = new AtomicBoolean(false)
override def start(): Unit = {
if (isStarted.get()) {
()
} else {
rawTxSubscriberOpt.foreach(_.start())
rawBlockSubscriberOpt.foreach(_.start())
isStarted.set(true)
}
}
override def stop(): Unit = {
if (isStarted.get()) {
rawTxSubscriberOpt.foreach(_.stop())
rawBlockSubscriberOpt.foreach(_.stop())
isStarted.set(false)
} else {
()
}
}
}

View file

@ -25,10 +25,13 @@ abstract class RpcUtil extends AsyncUtil {
/** Genreates a zmq config with unused ports */
def zmqConfig: ZmqConfig = {
ZmqConfig(
hashBlock = Some(new InetSocketAddress(randomPort)),
hashTx = Some(new InetSocketAddress(randomPort)),
rawTx = Some(new InetSocketAddress(randomPort)),
rawBlock = Some(new InetSocketAddress(randomPort))
hashBlock =
Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)),
hashTx =
Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)),
rawTx = Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)),
rawBlock =
Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort))
)
}
}

View file

@ -60,7 +60,7 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
_ = assert(firstBalance == Satoshis.zero)
// Setup zmq subscribers
_ = BitcoindRpcBackendUtil.startZMQWalletCallbacks(
zmqSubs = BitcoindRpcBackendUtil.startZMQWalletCallbacks(
wallet,
bitcoind.instance.zmqConfig)
_ <- AsyncUtil.nonBlockingSleep(5.seconds)
@ -72,9 +72,10 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures {
confirmed <- wallet.getConfirmedBalance()
_ = assert(confirmed == Satoshis.zero)
_ <- attemptZMQBlock(6, wallet)
_ <- attemptZMQBlock(walletAppConfig.requiredConfirmations, wallet)
balance <- wallet.getConfirmedBalance()
_ = zmqSubs.stop()
} yield {
// use >= because of multiple attempts
assert(balance >= amountToSend)

View file

@ -30,7 +30,7 @@ class ZMQSubscriber(
extends Logging
with StartStop[Unit] {
private var running = true
private var isConnected = false
private val context = ZMQ.context(1)
private val subscriber: ZMQ.Socket = context.socket(SocketType.SUB)
@ -40,54 +40,24 @@ class ZMQSubscriber(
private case object SubscriberRunnable extends Runnable {
override def run(): Unit = {
logger.info(s"ZmqSubscriber connecting to uri=$uri")
subscriber.setLinger(2000)
val isConnected = subscriber.connect(s"tcp://$uri")
if (isConnected) {
hashTxListener.foreach { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.foreach { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.foreach { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.foreach { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
while (running && !subscriberThread.isInterrupted) {
try {
val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK)
if (zmsg != null) {
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
processMsg(notificationTypeStr, body)
} else {
Thread.sleep(100)
}
} catch {
case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
case e: Exception =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
while (isConnected && !subscriberThread.isInterrupted) {
try {
val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK)
if (zmsg != null) {
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
processMsg(notificationTypeStr, body)
} else {
Thread.sleep(1)
}
} catch {
case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
case e: Exception =>
context.term()
logger.info(s"Done terminating zmq context msg=${e.getMessage}")
}
logger.info(s"Terminated")
} else {
logger.error(s"Failed to connect to zmq socket ${uri}")
throw new RuntimeException(s"Failed to connect to zmq socket ${uri}")
}
}
@ -99,8 +69,38 @@ class ZMQSubscriber(
subscriberThread.setDaemon(true)
override def start(): Unit = {
logger.info("starting zmq")
logger.info(s"ZmqSubscriber connecting to uri=$uri")
isConnected = subscriber.connect(s"tcp://$uri")
subscriberThread.start()
if (isConnected) {
hashTxListener.foreach { _ =>
val result = subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
if (result) logger.debug("subscribed to hashtxs stream from zmq")
else logger.error(s"Failed to subscribe to ${HashTx.topic}")
}
rawTxListener.foreach { _ =>
val result = subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
if (result) logger.debug("subscribed to rawtxs stream from zmq")
else logger.error(s"Failed to subscribe to ${RawTx.topic}")
}
hashBlockListener.foreach { _ =>
val result = subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
if (result) logger.debug("subscribed to hashblock stream from zmq")
else logger.error(s"Failed to subscribe to ${HashBlock.topic}")
}
rawBlockListener.foreach { _ =>
val result = subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
if (result) logger.debug("subscribed to raw block stream from zmq")
else logger.error(s"Failed to subscribe to ${RawBlock.topic}")
}
} else {
logger.error(s"Failed to connect to zmq socket ${uri}")
throw new RuntimeException(s"Failed to connect to zmq socket ${uri}")
}
}
/** Stops running the zmq subscriber and cleans up after zmq
@ -111,7 +111,7 @@ class ZMQSubscriber(
//i think this could technically not work, because currently we are blocking
//on Zmsg.recvMsg in our while loop. If we don't get another message we won't
//be able toe evaluate the while loop again. Moving forward with this for now.
running = false
isConnected = false
subscriber.close()
logger.info("Attempting to terminate context")
context.term()