Init for zmq, successfully parsing blocks from bitcoind

MVP working for zmq notifications

removing logging, making block header generators less likely to fail

subscribing earlier in zmq life cycle

refactoring some zmq stuff to remove excess Futures

Adding error condition to processMsg loop

changing error message in processMsg loop

refactoring ZMQSubscriber api to just take in Seq[Byte] functions

cleaning up nits, adding documentation, list shortcomings of current iteration
This commit is contained in:
Chris Stewart 2018-04-17 09:21:20 -05:00
parent 4e938065f2
commit 11e365a511
8 changed files with 265 additions and 39 deletions

View file

@ -1,37 +1,24 @@
import Deps._
lazy val commonSettings = Seq(
organization := "org.bitcoins",
name := "bitcoin-s-core",
version := "0.0.1-SNAPSHOT",
scalaVersion := "2.11.7"
)
lazy val appName = "bitcoin-s-core"
lazy val scalaV = "2.11.7"
lazy val slf4jV = "1.7.5"
lazy val logbackV = "1.0.13"
lazy val scalaTestV = "3.0.5"
lazy val scalacheckV = "1.13.4"
lazy val sprayV = "1.3.2"
lazy val bouncyCastleV = "1.55"
lazy val appDependencies = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"),
"org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV % "test",
"io.spray" %% "spray-json" % sprayV % "test"
)
lazy val root = Project(appName, file(".")).enablePlugins().settings(
commonSettings,
libraryDependencies ++= appDependencies
libraryDependencies ++= Deps.root
)
lazy val zmq = Project("bitcoin-s-zmq", file("zmq")).enablePlugins().settings(
commonSettings,
libraryDependencies ++= Deps.zmq
).dependsOn(root)
//test in assembly := {}

36
project/Deps.scala Normal file
View file

@ -0,0 +1,36 @@
import sbt._
object Deps {
lazy val scalaV = "2.11.7"
lazy val slf4jV = "1.7.5"
lazy val logbackV = "1.0.13"
lazy val scalaTestV = "3.0.5"
lazy val scalacheckV = "1.13.0"
lazy val sprayV = "1.3.2"
lazy val bouncyCastleV = "1.55"
lazy val zeromqV = "0.4.3"
lazy val root = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"),
"org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV,
"io.spray" %% "spray-json" % sprayV % "test"
)
lazy val zmq = Seq(
"org.scalatest" %% "scalatest" % scalaTestV % "test",
"org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(),
"org.zeromq" % "jeromq" % zeromqV,
"org.slf4j" % "slf4j-api" % slf4jV % "provided",
"ch.qos.logback" % "logback-classic" % logbackV
)
}

View file

@ -51,11 +51,9 @@ trait Merkle extends BitcoinSLogger {
else if (accum.isEmpty) throw new IllegalArgumentException("Should never have sub tree size of zero, this implies there was zero hashes given")
else build(accum.reverse, Nil)
case h :: h1 :: t =>
logger.debug("Subtrees: " + subTrees)
val newTree = computeTree(h, h1)
build(t, newTree +: accum)
case h :: t =>
logger.debug("Subtrees: " + subTrees)
//means that we have an odd amount of txids, this means we duplicate the last hash in the tree
val newTree = computeTree(h, h)
build(t, newTree +: accum)

View file

@ -60,12 +60,13 @@ sealed trait BlockHeader extends NetworkElement {
def merkleRootHash: DoubleSha256Digest
/** Returns the merkle root hash in BIG ENDIAN format. This is not compatible with the bitcoin
* protocol but it is useful for rpc clients and block explorers
* See this link for more info
* [[https://bitcoin.stackexchange.com/questions/2063/why-does-the-bitcoin-protocol-use-the-little-endian-notation]]
* @return
*/
/**
* Returns the merkle root hash in BIG ENDIAN format. This is not compatible with the bitcoin
* protocol but it is useful for rpc clients and block explorers
* See this link for more info
* [[https://bitcoin.stackexchange.com/questions/2063/why-does-the-bitcoin-protocol-use-the-little-endian-notation]]
* @return
*/
def merkleRootHashBE: DoubleSha256Digest = merkleRootHash.flip
/**

View file

@ -54,8 +54,6 @@ sealed trait PartialMerkleTree extends BitcoinSLogger {
/** Extracts the txids that were matched inside of the bloom filter used to create this partial merkle tree */
def extractMatches: Seq[DoubleSha256Digest] = {
//TODO: This is some really ugly that isn't tail recursive, try to clean this up eventually
logger.debug("Starting bits for extraction: " + bits)
logger.debug("Starting tree: " + tree)
def loop(
subTree: BinaryTree[DoubleSha256Digest],
remainingBits: Seq[Boolean], height: Int, pos: Int, accumMatches: Seq[DoubleSha256Digest]): (Seq[DoubleSha256Digest], Seq[Boolean]) = {
@ -127,9 +125,6 @@ object PartialMerkleTree {
*/
private def build(txMatches: Seq[(Boolean, DoubleSha256Digest)]): (Seq[Boolean], Seq[DoubleSha256Digest]) = {
val maxHeight = calcMaxHeight(txMatches.size)
logger.debug("Tx matches: " + txMatches)
logger.debug("Tx matches size: " + txMatches.size)
logger.debug("max height: " + maxHeight)
/**
* This loops through our merkle tree building [[bits]] so we can instruct another node how to create the partial merkle tree
@ -142,7 +137,6 @@ object PartialMerkleTree {
*/
def loop(bits: Seq[Boolean], hashes: Seq[DoubleSha256Digest], height: Int, pos: Int): (Seq[Boolean], Seq[DoubleSha256Digest]) = {
val parentOfMatch = matchesTx(maxHeight, maxHeight - height, pos, txMatches)
logger.debug("parent of match: " + parentOfMatch)
val newBits = parentOfMatch +: bits
if (height == 0 || !parentOfMatch) {
//means that we are either at the root of the merkle tree or there is nothing interesting below
@ -179,7 +173,6 @@ object PartialMerkleTree {
} else false
}
val startingPos = pos << inverseHeight
logger.debug("Height: " + inverseHeight + " pos: " + pos + " startingP: " + startingPos)
loop(startingPos)
}
@ -267,8 +260,6 @@ object PartialMerkleTree {
} else (Leaf(remainingHashes.head), remainingHashes.tail, remainingMatches.tail)
}
}
logger.debug("Original hashes: " + hashes)
logger.debug("Original bits: " + bits)
val (tree, remainingHashes, remainingBits) = loop(hashes, bits, 0, 0)
//we must have used all the hashes provided to us to reconstruct the partial merkle tree as per BIP37
require(remainingHashes.size == 0, "We should not have any left over hashes after building our partial merkle tree, got: " + remainingHashes)

View file

@ -0,0 +1,35 @@
package org.bitcoins.zmq
/**
* Represents the various notifications we can subscribe
* to from a zmq publisher
* [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]]
*/
sealed abstract class ZMQNotification {
val topic: String
}
case object HashTx extends ZMQNotification {
override val topic = "hashtx"
}
case object RawTx extends ZMQNotification {
override val topic = "rawtx"
}
case object HashBlock extends ZMQNotification {
override val topic = "hashblock"
}
case object RawBlock extends ZMQNotification {
override val topic = "rawblock"
}
object ZMQNotification {
def fromString(str: String): Option[ZMQNotification] = str match {
case HashTx.topic => Some(HashTx)
case RawTx.topic => Some(RawTx)
case HashBlock.topic => Some(HashBlock)
case RawBlock.topic => Some(RawBlock)
case _ => None
}
}

View file

@ -0,0 +1,119 @@
package org.bitcoins.zmq
import java.net.{ InetSocketAddress, Socket }
import org.bitcoins.core.crypto.{ DoubleSha256Digest, HashDigest }
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.BitcoinSLogger
import org.zeromq.{ ZMQ, ZMsg }
import scala.concurrent.{ ExecutionContext, Future }
/**
* This class is designed to consume a zmq stream from a cryptocurrency's daemon.
* An example of this is bitcoind. For information on how to setup your coin's conf
* file to be able to consume a zmq stream please see
* [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]]
* [[http://zguide.zeromq.org/java:psenvsub]]
* @param socket
* @param hashTxListener
* @param hashBlockListener
* @param rawTxListener
* @param rawBlockListener
*/
class ZMQSubscriber(
socket: InetSocketAddress,
hashTxListener: Option[Seq[Byte] => Future[Unit]],
hashBlockListener: Option[Seq[Byte] => Future[Unit]],
rawTxListener: Option[Seq[Byte] => Future[Unit]],
rawBlockListener: Option[Seq[Byte] => Future[Unit]]) {
private val logger = BitcoinSLogger.logger
private var run = false
private val context = ZMQ.context(1)
private val subscriber = context.socket(ZMQ.SUB)
private val uri = socket.getHostString + ":" + socket.getPort
def start()(implicit ec: ExecutionContext): Future[Unit] = Future {
logger.info("starting zmq")
subscriber.connect(uri)
logger.info("Connection to zmq client successful")
//subscribe to the appropriate feed
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}
rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}
hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}
rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block stream from zmq")
}
run = true
while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
val processedMsg = processMsg(notificationTypeStr, body)
processedMsg.onFailure {
case err =>
logger.error(err.getMessage)
}
}
}
/**
* Stops running the zmq subscriber and cleans up after zmq
* http://zguide.zeromq.org/java:psenvsub
*/
def stop: Unit = {
//i think this could technically not work, because currently we are blocking
//on Zmsg.recvMsg in our while loop. If we don't get another message we won't
//be able toe evaluate the while loop again. Moving forward with this for now.
run = false
subscriber.close()
context.term()
}
/** Processes a message that we received the from the cryptocurrency daemon and then
* applies the appropriate listener to that message.
*/
private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = {
val notification = ZMQNotification.fromString(topic)
val res: Option[Future[Unit]] = notification.flatMap {
case HashTx =>
hashTxListener.map { f =>
f(body)
}
case RawTx =>
rawTxListener.map { f =>
f(body)
}
case HashBlock =>
hashBlockListener.map { f =>
f(body)
}
case RawBlock =>
rawBlockListener.map { f =>
f(body)
}
}
res match {
case Some(f) => f
case None => Future.successful(Unit)
}
}
}

View file

@ -0,0 +1,59 @@
package org.bitcoins.zmq
import java.net.InetSocketAddress
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.scalatest.{ FlatSpec, MustMatchers }
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ Await, Future }
class ZMQSubscriberTest extends FlatSpec with MustMatchers {
private val logger = LoggerFactory.getLogger(this.getClass().toString)
"ZMQSubscriber" must "connect to a regtest instance of a daemon and stream txs/blocks from it" in {
//note for this unit test to pass, you need to setup a bitcoind instance yourself
//and set the bitcoin.conf file to allow for
//zmq connections
//see: https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md
val socket = new InetSocketAddress("tcp://127.0.0.1", 28332)
val zmqSub = new ZMQSubscriber(socket, None, None, rawTxListener, rawBlockListener)
//stupid, doesn't test anything, for now. You need to look at log output to verify this is working
Await.result(zmqSub.start(), 100.seconds)
zmqSub.stop
}
val rawBlockListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val block = Future(Block.fromBytes(bytes))
block.map { b =>
logger.debug(s"received block $b")
Future.successful(Unit)
}
}
}
val hashBlockListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val hash = Future(DoubleSha256Digest.fromBytes(bytes))
hash.map { h =>
logger.debug(s"received block hash $h")
Future.successful(Unit)
}
}
}
val rawTxListener: Option[Seq[Byte] => Future[Unit]] = Some {
{ bytes: Seq[Byte] =>
val txFuture = Future(Transaction.fromBytes(bytes))
txFuture.map { tx =>
logger.debug(s"received tx ${tx}")
Future.successful(Unit)
}
}
}
}