diff --git a/build.sbt b/build.sbt index 1585581a08..4e8c3bc39b 100644 --- a/build.sbt +++ b/build.sbt @@ -1,37 +1,24 @@ +import Deps._ + lazy val commonSettings = Seq( organization := "org.bitcoins", - name := "bitcoin-s-core", version := "0.0.1-SNAPSHOT", scalaVersion := "2.11.7" ) + lazy val appName = "bitcoin-s-core" -lazy val scalaV = "2.11.7" -lazy val slf4jV = "1.7.5" -lazy val logbackV = "1.0.13" -lazy val scalaTestV = "3.0.5" -lazy val scalacheckV = "1.13.4" -lazy val sprayV = "1.3.2" -lazy val bouncyCastleV = "1.55" -lazy val appDependencies = Seq( - "org.scalatest" %% "scalatest" % scalaTestV % "test", - "com.novocode" % "junit-interface" % "0.10" % "test", - "org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(), - - ("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"), - "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV, - - "org.slf4j" % "slf4j-api" % slf4jV % "provided", - "ch.qos.logback" % "logback-classic" % logbackV % "test", - - "io.spray" %% "spray-json" % sprayV % "test" -) lazy val root = Project(appName, file(".")).enablePlugins().settings( commonSettings, - libraryDependencies ++= appDependencies + libraryDependencies ++= Deps.root ) +lazy val zmq = Project("bitcoin-s-zmq", file("zmq")).enablePlugins().settings( + commonSettings, + libraryDependencies ++= Deps.zmq +).dependsOn(root) + //test in assembly := {} diff --git a/project/Deps.scala b/project/Deps.scala new file mode 100644 index 0000000000..ecb374ef6c --- /dev/null +++ b/project/Deps.scala @@ -0,0 +1,36 @@ +import sbt._ + + +object Deps { +lazy val scalaV = "2.11.7" +lazy val slf4jV = "1.7.5" +lazy val logbackV = "1.0.13" +lazy val scalaTestV = "3.0.5" +lazy val scalacheckV = "1.13.0" +lazy val sprayV = "1.3.2" +lazy val bouncyCastleV = "1.55" +lazy val zeromqV = "0.4.3" + +lazy val root = Seq( + "org.scalatest" %% "scalatest" % scalaTestV % "test", + "com.novocode" % "junit-interface" % "0.10" % "test", + "org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(), + + ("org.bitcoinj" % "bitcoinj-core" % "0.14.4" % "test").exclude("org.slf4j", "slf4j-api"), + "org.bouncycastle" % "bcprov-jdk15on" % bouncyCastleV, + + "org.slf4j" % "slf4j-api" % slf4jV % "provided", + "ch.qos.logback" % "logback-classic" % logbackV, + + "io.spray" %% "spray-json" % sprayV % "test" +) + +lazy val zmq = Seq( + "org.scalatest" %% "scalatest" % scalaTestV % "test", + "org.scalacheck" %% "scalacheck" % scalacheckV withSources() withJavadoc(), + "org.zeromq" % "jeromq" % zeromqV, + "org.slf4j" % "slf4j-api" % slf4jV % "provided", + "ch.qos.logback" % "logback-classic" % logbackV +) + +} diff --git a/src/main/scala/org/bitcoins/core/consensus/Merkle.scala b/src/main/scala/org/bitcoins/core/consensus/Merkle.scala index aea4fda9bf..569a476063 100644 --- a/src/main/scala/org/bitcoins/core/consensus/Merkle.scala +++ b/src/main/scala/org/bitcoins/core/consensus/Merkle.scala @@ -51,11 +51,9 @@ trait Merkle extends BitcoinSLogger { else if (accum.isEmpty) throw new IllegalArgumentException("Should never have sub tree size of zero, this implies there was zero hashes given") else build(accum.reverse, Nil) case h :: h1 :: t => - logger.debug("Subtrees: " + subTrees) val newTree = computeTree(h, h1) build(t, newTree +: accum) case h :: t => - logger.debug("Subtrees: " + subTrees) //means that we have an odd amount of txids, this means we duplicate the last hash in the tree val newTree = computeTree(h, h) build(t, newTree +: accum) diff --git a/src/main/scala/org/bitcoins/core/protocol/blockchain/BlockHeader.scala b/src/main/scala/org/bitcoins/core/protocol/blockchain/BlockHeader.scala index 23356cb7af..a3ee19aa5d 100644 --- a/src/main/scala/org/bitcoins/core/protocol/blockchain/BlockHeader.scala +++ b/src/main/scala/org/bitcoins/core/protocol/blockchain/BlockHeader.scala @@ -60,12 +60,13 @@ sealed trait BlockHeader extends NetworkElement { def merkleRootHash: DoubleSha256Digest - /** Returns the merkle root hash in BIG ENDIAN format. This is not compatible with the bitcoin - * protocol but it is useful for rpc clients and block explorers - * See this link for more info - * [[https://bitcoin.stackexchange.com/questions/2063/why-does-the-bitcoin-protocol-use-the-little-endian-notation]] - * @return - */ + /** + * Returns the merkle root hash in BIG ENDIAN format. This is not compatible with the bitcoin + * protocol but it is useful for rpc clients and block explorers + * See this link for more info + * [[https://bitcoin.stackexchange.com/questions/2063/why-does-the-bitcoin-protocol-use-the-little-endian-notation]] + * @return + */ def merkleRootHashBE: DoubleSha256Digest = merkleRootHash.flip /** diff --git a/src/main/scala/org/bitcoins/core/protocol/blockchain/PartialMerkleTree.scala b/src/main/scala/org/bitcoins/core/protocol/blockchain/PartialMerkleTree.scala index 79eb1b4ba3..1eab05acda 100644 --- a/src/main/scala/org/bitcoins/core/protocol/blockchain/PartialMerkleTree.scala +++ b/src/main/scala/org/bitcoins/core/protocol/blockchain/PartialMerkleTree.scala @@ -54,8 +54,6 @@ sealed trait PartialMerkleTree extends BitcoinSLogger { /** Extracts the txids that were matched inside of the bloom filter used to create this partial merkle tree */ def extractMatches: Seq[DoubleSha256Digest] = { //TODO: This is some really ugly that isn't tail recursive, try to clean this up eventually - logger.debug("Starting bits for extraction: " + bits) - logger.debug("Starting tree: " + tree) def loop( subTree: BinaryTree[DoubleSha256Digest], remainingBits: Seq[Boolean], height: Int, pos: Int, accumMatches: Seq[DoubleSha256Digest]): (Seq[DoubleSha256Digest], Seq[Boolean]) = { @@ -127,9 +125,6 @@ object PartialMerkleTree { */ private def build(txMatches: Seq[(Boolean, DoubleSha256Digest)]): (Seq[Boolean], Seq[DoubleSha256Digest]) = { val maxHeight = calcMaxHeight(txMatches.size) - logger.debug("Tx matches: " + txMatches) - logger.debug("Tx matches size: " + txMatches.size) - logger.debug("max height: " + maxHeight) /** * This loops through our merkle tree building [[bits]] so we can instruct another node how to create the partial merkle tree @@ -142,7 +137,6 @@ object PartialMerkleTree { */ def loop(bits: Seq[Boolean], hashes: Seq[DoubleSha256Digest], height: Int, pos: Int): (Seq[Boolean], Seq[DoubleSha256Digest]) = { val parentOfMatch = matchesTx(maxHeight, maxHeight - height, pos, txMatches) - logger.debug("parent of match: " + parentOfMatch) val newBits = parentOfMatch +: bits if (height == 0 || !parentOfMatch) { //means that we are either at the root of the merkle tree or there is nothing interesting below @@ -179,7 +173,6 @@ object PartialMerkleTree { } else false } val startingPos = pos << inverseHeight - logger.debug("Height: " + inverseHeight + " pos: " + pos + " startingP: " + startingPos) loop(startingPos) } @@ -267,8 +260,6 @@ object PartialMerkleTree { } else (Leaf(remainingHashes.head), remainingHashes.tail, remainingMatches.tail) } } - logger.debug("Original hashes: " + hashes) - logger.debug("Original bits: " + bits) val (tree, remainingHashes, remainingBits) = loop(hashes, bits, 0, 0) //we must have used all the hashes provided to us to reconstruct the partial merkle tree as per BIP37 require(remainingHashes.size == 0, "We should not have any left over hashes after building our partial merkle tree, got: " + remainingHashes) diff --git a/zmq/src/main/scala/org/bitcoins/zmq/ZMQNotification.scala b/zmq/src/main/scala/org/bitcoins/zmq/ZMQNotification.scala new file mode 100644 index 0000000000..8a547bddf9 --- /dev/null +++ b/zmq/src/main/scala/org/bitcoins/zmq/ZMQNotification.scala @@ -0,0 +1,35 @@ +package org.bitcoins.zmq + +/** + * Represents the various notifications we can subscribe + * to from a zmq publisher + * [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]] + */ +sealed abstract class ZMQNotification { + val topic: String +} + +case object HashTx extends ZMQNotification { + override val topic = "hashtx" +} +case object RawTx extends ZMQNotification { + override val topic = "rawtx" +} + +case object HashBlock extends ZMQNotification { + override val topic = "hashblock" +} + +case object RawBlock extends ZMQNotification { + override val topic = "rawblock" +} + +object ZMQNotification { + def fromString(str: String): Option[ZMQNotification] = str match { + case HashTx.topic => Some(HashTx) + case RawTx.topic => Some(RawTx) + case HashBlock.topic => Some(HashBlock) + case RawBlock.topic => Some(RawBlock) + case _ => None + } +} diff --git a/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala b/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala new file mode 100644 index 0000000000..1729e98fa4 --- /dev/null +++ b/zmq/src/main/scala/org/bitcoins/zmq/ZMQSubscriber.scala @@ -0,0 +1,119 @@ +package org.bitcoins.zmq + +import java.net.{ InetSocketAddress, Socket } + +import org.bitcoins.core.crypto.{ DoubleSha256Digest, HashDigest } +import org.bitcoins.core.protocol.blockchain.Block +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.util.BitcoinSLogger +import org.zeromq.{ ZMQ, ZMsg } + +import scala.concurrent.{ ExecutionContext, Future } + +/** + * This class is designed to consume a zmq stream from a cryptocurrency's daemon. + * An example of this is bitcoind. For information on how to setup your coin's conf + * file to be able to consume a zmq stream please see + * [[https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md#usage]] + * [[http://zguide.zeromq.org/java:psenvsub]] + * @param socket + * @param hashTxListener + * @param hashBlockListener + * @param rawTxListener + * @param rawBlockListener + */ +class ZMQSubscriber( + socket: InetSocketAddress, + hashTxListener: Option[Seq[Byte] => Future[Unit]], + hashBlockListener: Option[Seq[Byte] => Future[Unit]], + rawTxListener: Option[Seq[Byte] => Future[Unit]], + rawBlockListener: Option[Seq[Byte] => Future[Unit]]) { + private val logger = BitcoinSLogger.logger + + private var run = false + private val context = ZMQ.context(1) + + private val subscriber = context.socket(ZMQ.SUB) + private val uri = socket.getHostString + ":" + socket.getPort + + def start()(implicit ec: ExecutionContext): Future[Unit] = Future { + logger.info("starting zmq") + subscriber.connect(uri) + logger.info("Connection to zmq client successful") + //subscribe to the appropriate feed + hashTxListener.map { _ => + subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET)) + logger.debug("subscribed to the transaction hashes from zmq") + } + + rawTxListener.map { _ => + subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET)) + logger.debug("subscribed to raw transactions from zmq") + } + + hashBlockListener.map { _ => + subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET)) + logger.debug("subscribed to the hashblock stream from zmq") + } + + rawBlockListener.map { _ => + subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET)) + logger.debug("subscribed to raw block stream from zmq") + } + + run = true + while (run) { + val zmsg = ZMsg.recvMsg(subscriber) + val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET) + val body = zmsg.pop().getData + val processedMsg = processMsg(notificationTypeStr, body) + processedMsg.onFailure { + case err => + logger.error(err.getMessage) + } + } + } + + /** + * Stops running the zmq subscriber and cleans up after zmq + * http://zguide.zeromq.org/java:psenvsub + */ + def stop: Unit = { + //i think this could technically not work, because currently we are blocking + //on Zmsg.recvMsg in our while loop. If we don't get another message we won't + //be able toe evaluate the while loop again. Moving forward with this for now. + run = false + subscriber.close() + context.term() + } + + /** Processes a message that we received the from the cryptocurrency daemon and then + * applies the appropriate listener to that message. + */ + private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = { + val notification = ZMQNotification.fromString(topic) + val res: Option[Future[Unit]] = notification.flatMap { + case HashTx => + hashTxListener.map { f => + f(body) + } + case RawTx => + rawTxListener.map { f => + f(body) + } + case HashBlock => + hashBlockListener.map { f => + f(body) + } + case RawBlock => + rawBlockListener.map { f => + f(body) + } + } + + res match { + case Some(f) => f + case None => Future.successful(Unit) + } + } +} diff --git a/zmq/src/test/scala/org/bitcoins/zmq/ZMQSubscriberTest.scala b/zmq/src/test/scala/org/bitcoins/zmq/ZMQSubscriberTest.scala new file mode 100644 index 0000000000..0a7d72fb6f --- /dev/null +++ b/zmq/src/test/scala/org/bitcoins/zmq/ZMQSubscriberTest.scala @@ -0,0 +1,59 @@ +package org.bitcoins.zmq + +import java.net.InetSocketAddress + +import org.bitcoins.core.crypto.DoubleSha256Digest +import org.bitcoins.core.protocol.blockchain.Block +import org.bitcoins.core.protocol.transaction.Transaction +import org.scalatest.{ FlatSpec, MustMatchers } +import org.slf4j.LoggerFactory + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, Future } +class ZMQSubscriberTest extends FlatSpec with MustMatchers { + private val logger = LoggerFactory.getLogger(this.getClass().toString) + "ZMQSubscriber" must "connect to a regtest instance of a daemon and stream txs/blocks from it" in { + //note for this unit test to pass, you need to setup a bitcoind instance yourself + //and set the bitcoin.conf file to allow for + //zmq connections + //see: https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md + val socket = new InetSocketAddress("tcp://127.0.0.1", 28332) + + val zmqSub = new ZMQSubscriber(socket, None, None, rawTxListener, rawBlockListener) + //stupid, doesn't test anything, for now. You need to look at log output to verify this is working + Await.result(zmqSub.start(), 100.seconds) + zmqSub.stop + } + + val rawBlockListener: Option[Seq[Byte] => Future[Unit]] = Some { + { bytes: Seq[Byte] => + val block = Future(Block.fromBytes(bytes)) + block.map { b => + logger.debug(s"received block $b") + Future.successful(Unit) + } + } + } + + val hashBlockListener: Option[Seq[Byte] => Future[Unit]] = Some { + { bytes: Seq[Byte] => + val hash = Future(DoubleSha256Digest.fromBytes(bytes)) + hash.map { h => + logger.debug(s"received block hash $h") + Future.successful(Unit) + } + + } + } + + val rawTxListener: Option[Seq[Byte] => Future[Unit]] = Some { + { bytes: Seq[Byte] => + val txFuture = Future(Transaction.fromBytes(bytes)) + txFuture.map { tx => + logger.debug(s"received tx ${tx}") + Future.successful(Unit) + } + } + } +}