Adding more logging around zmq connection

This commit is contained in:
Chris Stewart 2018-04-27 08:54:42 -05:00
parent 11e365a511
commit d4bef008e5

View File

@ -38,40 +38,46 @@ class ZMQSubscriber(
def start()(implicit ec: ExecutionContext): Future[Unit] = Future {
logger.info("starting zmq")
subscriber.connect(uri)
logger.info("Connection to zmq client successful")
//subscribe to the appropriate feed
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
run = true
while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
val processedMsg = processMsg(notificationTypeStr, body)
processedMsg.onFailure {
case err =>
logger.error(err.getMessage)
val connected = subscriber.connect(uri)
if (connected) {
logger.info(s"Connection to zmq client successful to $uri")
//subscribe to the appropriate feed
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
run = true
while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
val processedMsg = processMsg(notificationTypeStr, body)
processedMsg.onFailure {
case err =>
logger.error(err.getMessage)
}
}
} else {
logger.info(s"Failed to connect zmq client to $uri")
Future.failed(throw new IllegalArgumentException(s"Failed to connect zmq client to $uri"))
}
}
/**
@ -87,9 +93,10 @@ class ZMQSubscriber(
context.term()
}
/** Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
/**
* Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = {
val notification = ZMQNotification.fromString(topic)
val res: Option[Future[Unit]] = notification.flatMap {