Make ZMQ Listeners typed (#2144)

This commit is contained in:
Ben Carman 2020-10-08 01:21:50 -05:00 committed by GitHub
parent fc1557cbe9
commit 7b53b02cfa
4 changed files with 54 additions and 56 deletions

View file

@ -10,7 +10,6 @@ import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber
import scodec.bits.ByteVector
import scala.concurrent.{ExecutionContext, Future, Promise}
@ -48,18 +47,16 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
val zmqSocket =
new InetSocketAddress("tcp://127.0.0.1", bitcoindRpcConf.zmqPort)
val rawTxListener: Option[ByteVector => Unit] = Some {
{ bytes: ByteVector =>
val tx = Transaction(bytes)
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
logger.debug(s"Received tx ${tx.txIdBE}, processing")
wallet.processTransaction(tx, None)
()
}
}
val rawBlockListener: Option[ByteVector => Unit] = Some {
{ bytes: ByteVector =>
val block = Block(bytes)
val rawBlockListener: Option[Block => Unit] = Some {
{ block: Block =>
logger.debug(s"Received block ${block.blockHeader.hashBE}, processing")
wallet.processBlock(block)
()

View file

@ -29,7 +29,6 @@ import org.bitcoins.testkit.{chain, BitcoinSTestAppConfig}
import org.bitcoins.zmq.ZMQSubscriber
import org.scalatest._
import play.api.libs.json.{JsError, JsSuccess, Json}
import scodec.bits.ByteVector
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, Future}
@ -223,10 +222,8 @@ trait ChainUnitTest
val zmqRawBlockUriOpt: Option[InetSocketAddress] =
bitcoind.instance.zmqConfig.rawBlock
val handleRawBlock: ByteVector => Unit = { bytes: ByteVector =>
val block = Block.fromBytes(bytes)
val handleRawBlock: Block => Unit = { block: Block =>
chainHandlerF.flatMap(_.processHeader(block.blockHeader))
()
}

View file

@ -2,7 +2,10 @@ package org.bitcoins.zmq
import java.net.InetSocketAddress
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.zeromq.{SocketType, ZMQ, ZMQException, ZMsg}
import scodec.bits.ByteVector
@ -20,10 +23,10 @@ import scodec.bits.ByteVector
*/
class ZMQSubscriber(
socket: InetSocketAddress,
hashTxListener: Option[ByteVector => Unit],
hashBlockListener: Option[ByteVector => Unit],
rawTxListener: Option[ByteVector => Unit],
rawBlockListener: Option[ByteVector => Unit])
hashTxListener: Option[DoubleSha256DigestBE => Unit],
hashBlockListener: Option[DoubleSha256DigestBE => Unit],
rawTxListener: Option[Transaction => Unit],
rawBlockListener: Option[Block => Unit])
extends BitcoinSLogger {
private var running = true
@ -125,19 +128,23 @@ class ZMQSubscriber(
notification.foreach {
case HashTx =>
hashTxListener.foreach { f =>
f(ByteVector(body))
val hash = DoubleSha256DigestBE(ByteVector(body))
f(hash)
}
case RawTx =>
rawTxListener.foreach { f =>
f(ByteVector(body))
val tx = Transaction(ByteVector(body))
f(tx)
}
case HashBlock =>
hashBlockListener.foreach { f =>
f(ByteVector(body))
val hash = DoubleSha256DigestBE(ByteVector(body))
f(hash)
}
case RawBlock =>
rawBlockListener.foreach { f =>
f(ByteVector(body))
val block = Block(ByteVector(body))
f(block)
}
}

View file

@ -2,19 +2,38 @@ package org.bitcoins.zmq
import java.net.InetSocketAddress
import org.bitcoins.core.util.BytesUtil
import org.bitcoins.core.config.MainNet
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.scalatest.flatspec.AsyncFlatSpec
import org.slf4j.LoggerFactory
import org.zeromq.{SocketType, ZFrame, ZMQ, ZMsg}
import scodec.bits.ByteVector
import scala.concurrent.Promise
class ZMQSubscriberTest extends AsyncFlatSpec {
private val logger = LoggerFactory.getLogger(this.getClass().toString)
class ZMQSubscriberTest extends AsyncFlatSpec with BitcoinSLogger {
behavior of "ZMQSubscriber"
val rawBlockListener: Option[Block => Unit] = Some {
{ block: Block =>
logger.debug(s"received raw block ${block.hex}")
}
}
val hashBlockListener: Option[DoubleSha256DigestBE => Unit] = Some {
{ hash: DoubleSha256DigestBE =>
logger.debug(s"received raw block hash ${hash.hex}")
}
}
val rawTxListener: Option[Transaction => Unit] = Some {
{ tx: Transaction =>
logger.debug(s"received raw tx ${tx.hex}")
}
}
it must "connect to a regtest instance of a daemon and stream txs/blocks from it" in {
//note for this unit test to pass, you need to setup a bitcoind instance yourself
//and set the bitcoin.conf file to allow for
@ -28,13 +47,13 @@ class ZMQSubscriberTest extends AsyncFlatSpec {
// TODO: In the future this could use the testkit to verify the subscriber by calling generate(1)
zmqSub.start()
Thread.sleep(10000) // 10 seconds
zmqSub.stop
zmqSub.stop()
succeed
}
it must "be able to subscribe to a publisher and read a value" in {
val port = Math.abs(scala.util.Random.nextInt % 14000) + 1000
val port = Math.abs(scala.util.Random.nextInt % 14000) + 1024
val socket = new InetSocketAddress("tcp://127.0.0.1", port)
val context = ZMQ.context(1)
@ -43,10 +62,10 @@ class ZMQSubscriberTest extends AsyncFlatSpec {
val uri = socket.getHostString + ":" + socket.getPort
publisher.bind(uri)
val valuePromise = Promise[String]()
val fakeBlockListener: Option[ByteVector => Unit] = Some { bytes =>
val str = new String(bytes.toArray)
valuePromise.success(str)
val valuePromise = Promise[Array[Byte]]()
val fakeBlockListener: Option[Block => Unit] = Some { block =>
val bytes = block.bytes.toArray
valuePromise.success(bytes)
()
}
@ -54,7 +73,7 @@ class ZMQSubscriberTest extends AsyncFlatSpec {
sub.start()
Thread.sleep(1000)
val testValue = "sweet, sweet satoshis"
val testValue = MainNet.chainParams.genesisBlock.bytes.toArray
val msg = new ZMsg()
msg.add(new ZFrame(RawBlock.topic))
@ -63,34 +82,12 @@ class ZMQSubscriberTest extends AsyncFlatSpec {
val sent = msg.send(publisher)
assert(sent)
valuePromise.future.map { str =>
sub.stop
valuePromise.future.map { bytes =>
sub.stop()
publisher.close()
context.term()
assert(str == testValue)
}
}
val rawBlockListener: Option[ByteVector => Unit] = Some {
{ bytes: ByteVector =>
val hex = BytesUtil.encodeHex(bytes)
logger.debug(s"received raw block ${hex}")
}
}
val hashBlockListener: Option[ByteVector => Unit] = Some {
{ bytes: ByteVector =>
val hex = BytesUtil.encodeHex(bytes)
logger.debug(s"received raw block hash ${hex}")
}
}
val rawTxListener: Option[ByteVector => Unit] = Some {
{ bytes: ByteVector =>
val hex = BytesUtil.encodeHex(bytes)
logger.debug(s"received raw tx ${hex}")
assert(bytes sameElements testValue)
}
}
}