POC for threading model in ZMQ subscriber

This commit is contained in:
Noah Cornwell 2018-04-30 15:31:24 -04:00 committed by Chris Stewart
parent 7698f95725
commit 294ed10361

View File

@ -24,24 +24,23 @@ import scala.concurrent.{ ExecutionContext, Future }
*/
class ZMQSubscriber(
socket: InetSocketAddress,
hashTxListener: Option[Seq[Byte] => Future[Unit]],
hashBlockListener: Option[Seq[Byte] => Future[Unit]],
rawTxListener: Option[Seq[Byte] => Future[Unit]],
rawBlockListener: Option[Seq[Byte] => Future[Unit]]) {
hashTxListener: Option[Seq[Byte] => Unit],
hashBlockListener: Option[Seq[Byte] => Unit],
rawTxListener: Option[Seq[Byte] => Unit],
rawBlockListener: Option[Seq[Byte] => Unit]) {
private val logger = BitcoinSLogger.logger
private var run = false
private var run = true
private val context = ZMQ.context(1)
private val subscriber = context.socket(ZMQ.SUB)
private val uri = socket.getHostString + ":" + socket.getPort
def start()(implicit ec: ExecutionContext): Future[Unit] = Future {
logger.info("starting zmq")
val connected = subscriber.connect(uri)
if (connected) {
logger.info(s"Connection to zmq client successful to $uri")
//subscribe to the appropriate feed
private case object SubscriberRunnable = new Runnable {
override def run(): Unit = {
val connected = subscriber.connect(uri)
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
@ -62,22 +61,26 @@ class ZMQSubscriber(
logger.debug("subscribed to raw block stream from zmq")
}
run = true
while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
val processedMsg = processMsg(notificationTypeStr, body)
processedMsg.onFailure {
case err =>
logger.error(err.getMessage)
val zmsg = ZMsg.recvMsg(subscriber, , ZMQ.NOBLOCK)
if (zmsg != null) {
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
processMsg(notificationTypeStr, body)
} else {
Thread.sleep(1)
}
}
} else {
logger.info(s"Failed to connect zmq client to $uri")
Future.failed(throw new IllegalArgumentException(s"Failed to connect zmq client to $uri"))
}
}
private val subscriberThread = new Thread(SubscriberRunnable)
subscriberThread.setName("ZMQSubscriber-thread")
subscriberThread.setDaemon(true)
def start(): Unit = {
logger.info("starting zmq")
subscriberThread.start()
}
/**
@ -97,30 +100,26 @@ class ZMQSubscriber(
* Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = {
private def processMsg(topic: String, body: Seq[Byte]): Unit = {
val notification = ZMQNotification.fromString(topic)
val res: Option[Future[Unit]] = notification.flatMap {
notification.foreach {
case HashTx =>
hashTxListener.map { f =>
hashTxListener.foreach { f =>
f(body)
}
case RawTx =>
rawTxListener.map { f =>
rawTxListener.foreach { f =>
f(body)
}
case HashBlock =>
hashBlockListener.map { f =>
hashBlockListener.foreach { f =>
f(body)
}
case RawBlock =>
rawBlockListener.map { f =>
rawBlockListener.foreach { f =>
f(body)
}
}
res match {
case Some(f) => f
case None => Future.successful(Unit)
}
}
}