diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index 978a8d2736..872750952a 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -221,13 +221,15 @@ object BitcoindRpcBackendUtil extends Logging { def startZMQWalletCallbacks( wallet: NeutrinoHDWalletApi, - zmqConfig: ZmqConfig)(implicit ec: ExecutionContext): Unit = { + zmqConfig: ZmqConfig)(implicit + ec: ExecutionContext): WalletZmqSubscribers = { require(zmqConfig != ZmqConfig.empty, "Must have the zmq raw configs defined to setup ZMQ callbacks") - zmqConfig.rawTx.foreach { zmq => + val rawTxSub = zmqConfig.rawTx.map { zmq => val rawTxListener: Option[Transaction => Unit] = Some { { tx: Transaction => + println(s"Received tx ${tx.txIdBE.hex}, processing") logger.debug(s"Received tx ${tx.txIdBE.hex}, processing") val f = wallet.processTransaction(tx, None) f.failed.foreach { err => @@ -241,14 +243,15 @@ object BitcoindRpcBackendUtil extends Logging { hashTxListener = None, hashBlockListener = None, rawTxListener = rawTxListener, - rawBlockListener = None).start() + rawBlockListener = None) } - zmqConfig.rawBlock.foreach { zmq => + val rawBlockSub = zmqConfig.rawBlock.map { zmq => val rawBlockListener: Option[Block => Unit] = Some { { block: Block => logger.info( s"Received block ${block.blockHeader.hashBE.hex}, processing") + println(s"Received block ${block.blockHeader.hashBE.hex}, processing") val f = wallet.processBlock(block) f.failed.foreach { err => logger.error("failed to process raw block zmq message", err) @@ -261,8 +264,12 @@ object BitcoindRpcBackendUtil extends Logging { hashTxListener = None, hashBlockListener = None, rawTxListener = None, - rawBlockListener = rawBlockListener).start() + rawBlockListener = rawBlockListener) } + + val subs = WalletZmqSubscribers(rawTxSub, rawBlockSub) + subs.start() + subs } def createDLCWalletWithBitcoindCallbacks( diff --git a/app/server/src/main/scala/org/bitcoins/server/WalletZmqSubscribers.scala b/app/server/src/main/scala/org/bitcoins/server/WalletZmqSubscribers.scala new file mode 100644 index 0000000000..df0f38c64a --- /dev/null +++ b/app/server/src/main/scala/org/bitcoins/server/WalletZmqSubscribers.scala @@ -0,0 +1,34 @@ +package org.bitcoins.server + +import org.bitcoins.core.util.StartStop +import org.bitcoins.zmq.ZMQSubscriber + +import java.util.concurrent.atomic.AtomicBoolean + +case class WalletZmqSubscribers( + rawTxSubscriberOpt: Option[ZMQSubscriber], + rawBlockSubscriberOpt: Option[ZMQSubscriber]) + extends StartStop[Unit] { + private val isStarted: AtomicBoolean = new AtomicBoolean(false) + + override def start(): Unit = { + if (isStarted.get()) { + () + } else { + rawTxSubscriberOpt.foreach(_.start()) + rawBlockSubscriberOpt.foreach(_.start()) + isStarted.set(true) + } + } + + override def stop(): Unit = { + if (isStarted.get()) { + rawTxSubscriberOpt.foreach(_.stop()) + rawBlockSubscriberOpt.foreach(_.stop()) + isStarted.set(false) + } else { + () + } + + } +} diff --git a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala index e9e167f237..35f0f7b154 100644 --- a/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala +++ b/bitcoind-rpc/src/main/scala/org/bitcoins/rpc/util/RpcUtil.scala @@ -25,10 +25,13 @@ abstract class RpcUtil extends AsyncUtil { /** Genreates a zmq config with unused ports */ def zmqConfig: ZmqConfig = { ZmqConfig( - hashBlock = Some(new InetSocketAddress(randomPort)), - hashTx = Some(new InetSocketAddress(randomPort)), - rawTx = Some(new InetSocketAddress(randomPort)), - rawBlock = Some(new InetSocketAddress(randomPort)) + hashBlock = + Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)), + hashTx = + Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)), + rawTx = Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)), + rawBlock = + Some(InetSocketAddress.createUnresolved("127.0.0.1", randomPort)) ) } } diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala index f5f738ed99..71a0bf2641 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/BitcoindZMQBackendTest.scala @@ -60,7 +60,7 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures { _ = assert(firstBalance == Satoshis.zero) // Setup zmq subscribers - _ = BitcoindRpcBackendUtil.startZMQWalletCallbacks( + zmqSubs = BitcoindRpcBackendUtil.startZMQWalletCallbacks( wallet, bitcoind.instance.zmqConfig) _ <- AsyncUtil.nonBlockingSleep(5.seconds) @@ -72,9 +72,10 @@ class BitcoindZMQBackendTest extends WalletAppConfigWithBitcoindNewestFixtures { confirmed <- wallet.getConfirmedBalance() _ = assert(confirmed == Satoshis.zero) - _ <- attemptZMQBlock(6, wallet) + _ <- attemptZMQBlock(walletAppConfig.requiredConfirmations, wallet) balance <- wallet.getConfirmedBalance() + _ = zmqSubs.stop() } yield { // use >= because of multiple attempts assert(balance >= amountToSend) diff --git a/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala b/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala index 054d03ecbd..0f856fda7a 100644 --- a/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala +++ b/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala @@ -30,7 +30,7 @@ class ZMQSubscriber( extends Logging with StartStop[Unit] { - private var running = true + private var isConnected = false private val context = ZMQ.context(1) private val subscriber: ZMQ.Socket = context.socket(SocketType.SUB) @@ -40,54 +40,24 @@ class ZMQSubscriber( private case object SubscriberRunnable extends Runnable { override def run(): Unit = { - logger.info(s"ZmqSubscriber connecting to uri=$uri") - subscriber.setLinger(2000) - val isConnected = subscriber.connect(s"tcp://$uri") - - if (isConnected) { - hashTxListener.foreach { _ => - subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET)) - logger.debug("subscribed to the transaction hashes from zmq") - } - - rawTxListener.foreach { _ => - subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET)) - logger.debug("subscribed to raw transactions from zmq") - } - - hashBlockListener.foreach { _ => - subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET)) - logger.debug("subscribed to the hashblock stream from zmq") - } - - rawBlockListener.foreach { _ => - subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET)) - logger.debug("subscribed to raw block stream from zmq") - } - - while (running && !subscriberThread.isInterrupted) { - try { - val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK) - if (zmsg != null) { - val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET) - val body = zmsg.pop().getData - processMsg(notificationTypeStr, body) - } else { - Thread.sleep(100) - } - } catch { - case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode => - context.term() - logger.info(s"Done terminating zmq context msg=${e.getMessage}") - case e: Exception => - context.term() - logger.info(s"Done terminating zmq context msg=${e.getMessage}") + while (isConnected && !subscriberThread.isInterrupted) { + try { + val zmsg = ZMsg.recvMsg(subscriber, ZMQ.NOBLOCK) + if (zmsg != null) { + val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET) + val body = zmsg.pop().getData + processMsg(notificationTypeStr, body) + } else { + Thread.sleep(1) } + } catch { + case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode => + context.term() + logger.info(s"Done terminating zmq context msg=${e.getMessage}") + case e: Exception => + context.term() + logger.info(s"Done terminating zmq context msg=${e.getMessage}") } - logger.info(s"Terminated") - } else { - logger.error(s"Failed to connect to zmq socket ${uri}") - throw new RuntimeException(s"Failed to connect to zmq socket ${uri}") } } @@ -99,8 +69,38 @@ class ZMQSubscriber( subscriberThread.setDaemon(true) override def start(): Unit = { - logger.info("starting zmq") + logger.info(s"ZmqSubscriber connecting to uri=$uri") + + isConnected = subscriber.connect(s"tcp://$uri") subscriberThread.start() + if (isConnected) { + hashTxListener.foreach { _ => + val result = subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET)) + if (result) logger.debug("subscribed to hashtxs stream from zmq") + else logger.error(s"Failed to subscribe to ${HashTx.topic}") + } + + rawTxListener.foreach { _ => + val result = subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET)) + if (result) logger.debug("subscribed to rawtxs stream from zmq") + else logger.error(s"Failed to subscribe to ${RawTx.topic}") + } + + hashBlockListener.foreach { _ => + val result = subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET)) + if (result) logger.debug("subscribed to hashblock stream from zmq") + else logger.error(s"Failed to subscribe to ${HashBlock.topic}") + } + + rawBlockListener.foreach { _ => + val result = subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET)) + if (result) logger.debug("subscribed to raw block stream from zmq") + else logger.error(s"Failed to subscribe to ${RawBlock.topic}") + } + } else { + logger.error(s"Failed to connect to zmq socket ${uri}") + throw new RuntimeException(s"Failed to connect to zmq socket ${uri}") + } } /** Stops running the zmq subscriber and cleans up after zmq @@ -111,7 +111,7 @@ class ZMQSubscriber( //i think this could technically not work, because currently we are blocking //on Zmsg.recvMsg in our while loop. If we don't get another message we won't //be able toe evaluate the while loop again. Moving forward with this for now. - running = false + isConnected = false subscriber.close() logger.info("Attempting to terminate context") context.term()