Merge pull request #151 from Christewart/zmq

Zmq
This commit is contained in:
Chris Stewart 2018-04-26 11:13:53 -04:00 committed by GitHub
commit d4fc39f938
7 changed files with 258 additions and 33 deletions

View file

@ -1,37 +1,24 @@
import Deps._
lazy val commonSettings = Seq(
organization := "org.bitcoins",
name := "bitcoin-s-core",
version := "0.0.1-SNAPSHOT",
scalaVersion := "2.11.7"
)
lazy val appName = "bitcoin-s-core"
lazy val scalaV = "2.11.7"
lazy val slf4jV = "1.7.5"
lazy val logbackV = "1.0.13"
lazy val scalaTestV = "3.0.5"
lazy val scalacheckV = "1.13.4"
lazy val sprayV = "1.3.2"
lazy val bouncyCastleV = "1.55"
lazy val appDependencies = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"),
"org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV % "test",
"io.spray" %% "spray-json" % sprayV % "test"
)
lazy val root = Project(appName, file(".")).enablePlugins().settings(
commonSettings,
libraryDependencies ++= appDependencies
libraryDependencies ++= Deps.root
)
lazy val zmq = Project("bitcoin-s-zmq", file("zmq")).enablePlugins().settings(
commonSettings,
libraryDependencies ++= Deps.zmq
).dependsOn(root)
//test in assembly := {}

36
project/Deps.scala Normal file
View file

@ -0,0 +1,36 @@
import sbt._
object Deps {
lazy val scalaV = "2.11.7"
lazy val slf4jV = "1.7.5"
lazy val logbackV = "1.0.13"
lazy val scalaTestV = "3.0.5"
lazy val scalacheckV = "1.13.0"
lazy val sprayV = "1.3.2"
lazy val bouncyCastleV = "1.55"
lazy val zeromqV = "0.4.3"
lazy val root = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"),
"org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV,
"io.spray" %% "spray-json" % sprayV % "test"
)
lazy val zmq = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
"org.zeromq" % "jeromq" % zeromqV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV
)
}

View file

@ -51,11 +51,9 @@ trait Merkle extends BitcoinSLogger {
else if (accum.isEmpty) throw new IllegalArgumentException("Should never have sub tree size of zero, this implies there was zero hashes given")
else build(accum.reverse, Nil)
case h :: h1 :: t =>
logger.debug("Subtrees: " + subTrees)
val newTree = computeTree(h, h1)
build(t, newTree +: accum)
case h :: t =>
logger.debug("Subtrees: " + subTrees)
//means that we have an odd amount of txids, this means we duplicate the last hash in the tree
val newTree = computeTree(h, h)
build(t, newTree +: accum)

View file

@ -54,8 +54,6 @@ sealed trait PartialMerkleTree extends BitcoinSLogger {
/** Extracts the txids that were matched inside of the bloom filter used to create this partial merkle tree */
def extractMatches: Seq[DoubleSha256Digest] = {
//TODO: This is some really ugly that isn't tail recursive, try to clean this up eventually
logger.debug("Starting bits for extraction: " + bits)
logger.debug("Starting tree: " + tree)
def loop(
subTree: BinaryTree[DoubleSha256Digest],
remainingBits: Seq[Boolean], height: Int, pos: Int, accumMatches: Seq[DoubleSha256Digest]): (Seq[DoubleSha256Digest], Seq[Boolean]) = {
@ -127,9 +125,6 @@ object PartialMerkleTree {
*/
private def build(txMatches: Seq[(Boolean, DoubleSha256Digest)]): (Seq[Boolean], Seq[DoubleSha256Digest]) = {
val maxHeight = calcMaxHeight(txMatches.size)
logger.debug("Tx matches: " + txMatches)
logger.debug("Tx matches size: " + txMatches.size)
logger.debug("max height: " + maxHeight)
/**
* This loops through our merkle tree building [[bits]] so we can instruct another node how to create the partial merkle tree
@ -142,7 +137,6 @@ object PartialMerkleTree {
*/
def loop(bits: Seq[Boolean], hashes: Seq[DoubleSha256Digest], height: Int, pos: Int): (Seq[Boolean], Seq[DoubleSha256Digest]) = {
val parentOfMatch = matchesTx(maxHeight, maxHeight - height, pos, txMatches)
logger.debug("parent of match: " + parentOfMatch)
val newBits = parentOfMatch +: bits
if (height == 0 || !parentOfMatch) {
//means that we are either at the root of the merkle tree or there is nothing interesting below
@ -179,7 +173,6 @@ object PartialMerkleTree {
} else false
}
val startingPos = pos << inverseHeight
logger.debug("Height: " + inverseHeight + " pos: " + pos + " startingP: " + startingPos)
loop(startingPos)
}
@ -267,8 +260,6 @@ object PartialMerkleTree {
} else (Leaf(remainingHashes.head), remainingHashes.tail, remainingMatches.tail)
}
}
logger.debug("Original hashes: " + hashes)
logger.debug("Original bits: " + bits)
val (tree, remainingHashes, remainingBits) = loop(hashes, bits, 0, 0)
//we must have used all the hashes provided to us to reconstruct the partial merkle tree as per BIP37
require(remainingHashes.size == 0, "We should not have any left over hashes after building our partial merkle tree, got: " + remainingHashes)

View file

@ -0,0 +1,35 @@
package org.bitcoins.zmq
/**
* Represents the various notifications we can subscribe
* to from a zmq publisher
* [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]]
*/
sealed abstract class ZMQNotification {
val topic: String
}
case object HashTx extends ZMQNotification {
override val topic = "hashtx"
}
case object RawTx extends ZMQNotification {
override val topic = "rawtx"
}
case object HashBlock extends ZMQNotification {
override val topic = "hashblock"
}
case object RawBlock extends ZMQNotification {
override val topic = "rawblock"
}
object ZMQNotification {
def fromString(str: String): Option[ZMQNotification] = str match {
case HashTx.topic => Some(HashTx)
case RawTx.topic => Some(RawTx)
case HashBlock.topic => Some(HashBlock)
case RawBlock.topic => Some(RawBlock)
case _ => None
}
}

View file

@ -0,0 +1,119 @@
package org.bitcoins.zmq
import java.net.{ InetSocketAddress, Socket }
import org.bitcoins.core.crypto.{ DoubleSha256Digest, HashDigest }
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.BitcoinSLogger
import org.zeromq.{ ZMQ, ZMsg }
import scala.concurrent.{ ExecutionContext, Future }
/**
* This class is designed to consume a zmq stream from a cryptocurrency's daemon.
* An example of this is bitcoind. For information on how to setup your coin's conf
* file to be able to consume a zmq stream please see
* [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]]
* [[http://zguide.zeromq.org/java:psenvsub]]
* @param socket
* @param hashTxListener
* @param hashBlockListener
* @param rawTxListener
* @param rawBlockListener
*/
class ZMQSubscriber(
socket: InetSocketAddress,
hashTxListener: Option[Seq[Byte] => Future[Unit]],
hashBlockListener: Option[Seq[Byte] => Future[Unit]],
rawTxListener: Option[Seq[Byte] => Future[Unit]],
rawBlockListener: Option[Seq[Byte] => Future[Unit]]) {
private val logger = BitcoinSLogger.logger
private var run = false
private val context = ZMQ.context(1)
private val subscriber = context.socket(ZMQ.SUB)
private val uri = socket.getHostString + ":" + socket.getPort
def start()(implicit ec: ExecutionContext): Future[Unit] = Future {
logger.info("starting zmq")
subscriber.connect(uri)
logger.info("Connection to zmq client successful")
//subscribe to the appropriate feed
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
run = true
while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
val processedMsg = processMsg(notificationTypeStr, body)
processedMsg.onFailure {
case err =>
logger.error(err.getMessage)
}
}
}
/**
* Stops running the zmq subscriber and cleans up after zmq
* http://zguide.zeromq.org/java:psenvsub
*/
def stop: Unit = {
//i think this could technically not work, because currently we are blocking
//on Zmsg.recvMsg in our while loop. If we don't get another message we won't
//be able toe evaluate the while loop again. Moving forward with this for now.
run = false
subscriber.close()
context.term()
}
/** Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = {
val notification = ZMQNotification.fromString(topic)
val res: Option[Future[Unit]] = notification.flatMap {
case HashTx =>
hashTxListener.map { f =>
f(body)
}
case RawTx =>
rawTxListener.map { f =>
f(body)
}
case HashBlock =>
hashBlockListener.map { f =>
f(body)
}
case RawBlock =>
rawBlockListener.map { f =>
f(body)
}
}
res match {
case Some(f) => f
case None => Future.successful(Unit)
}
}
}

View file

@ -0,0 +1,59 @@
package org.bitcoins.zmq
import java.net.InetSocketAddress
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.scalatest.{ FlatSpec, MustMatchers }
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, Future }
class ZMQSubscriberTest extends FlatSpec with MustMatchers {
private val logger = LoggerFactory.getLogger(this.getClass().toString)
"ZMQSubscriber" must "connect to a regtest instance of a daemon and stream txs/blocks from it" in {
//note for this unit test to pass, you need to setup a bitcoind instance yourself
//and set the bitcoin.conf file to allow for
//zmq connections
//see: https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md
val socket = new InetSocketAddress("tcp://127.0.0.1", 28332)
val zmqSub = new ZMQSubscriber(socket, None, None, rawTxListener, rawBlockListener)
//stupid, doesn't test anything, for now. You need to look at log output to verify this is working
Await.result(zmqSub.start(), 100.seconds)
zmqSub.stop
}
val rawBlockListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val block = Future(Block.fromBytes(bytes))
block.map { b =>
logger.debug(s"received block $b")
Future.successful(Unit)
}
}
}
val hashBlockListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val hash = Future(DoubleSha256Digest.fromBytes(bytes))
hash.map { h =>
logger.debug(s"received block hash $h")
Future.successful(Unit)
}
}
}
val rawTxListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val txFuture = Future(Transaction.fromBytes(bytes))
txFuture.map { tx =>
logger.debug(s"received tx ${tx}")
Future.successful(Unit)
}
}
}
}