From 46280c9e597e667c909f80d5f0441eac2e24bfe8 Mon Sep 17 00:00:00 2001 From: Torkel Rogstad Date: Fri, 2 Aug 2019 16:22:20 +0200 Subject: [PATCH] Add functionality for updating SPV node bloom filter (#585) * Add functionality for updating SPV node bloom filter * Add SPV node shutdown to exit hook * Clean up traits/case classes * Change fixture in WalletBloomTest * Fix logging bug in TransactionProcessing * Add MerkleBuffers In this commit we add MerkleBuffers, which is an object that lets us aggreagate merkleblocks with their corresponding transactions before sending them out. This is global, mutable state (bad!) but it's a working solution for now; * Use TestAsyncUtil * Add MerkleBuffers test * Send getdata if receiving single header * Change awaitSync to use block count * Fix UpdateBloomFilterTest * Add more logging of chain/headers validation * Send getdata for all blocks * Nits: Scaladocs, comments toString --- .../main/scala/org/bitcoins/server/Main.scala | 17 +- .../chain/blockchain/Blockchain.scala | 14 +- .../chain/blockchain/ChainHandler.scala | 40 +++- .../chain/validation/TipUpdateResult.scala | 5 +- .../org/bitcoins/core/p2p/Inventory.scala | 31 +-- .../bitcoins/core/p2p/NetworkPayload.scala | 93 ++++----- .../bitcoins/node/UpdateBloomFilterTest.scala | 189 ++++++++++++++++++ .../networking/peer/MerkleBuffersTest.scala | 82 ++++++++ .../scala/org/bitcoins/node/SpvNode.scala | 35 +++- .../bitcoins/node/networking/P2PClient.scala | 6 +- .../networking/peer/DataMessageHandler.scala | 78 ++++++-- .../node/networking/peer/MerkleBuffers.scala | 115 +++++++++++ .../networking/peer/PeerMessageSender.scala | 29 +++ .../bitcoins/testkit/node/NodeTestUtil.scala | 26 ++- .../org/bitcoins/wallet/WalletBloomTest.scala | 9 +- .../internal/TransactionProcessing.scala | 2 +- 16 files changed, 643 insertions(+), 128 deletions(-) create mode 100644 node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala create mode 100644 node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala create mode 100644 node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala diff --git a/app/server/src/main/scala/org/bitcoins/server/Main.scala b/app/server/src/main/scala/org/bitcoins/server/Main.scala index e7534ae365..ae8f78bf46 100644 --- a/app/server/src/main/scala/org/bitcoins/server/Main.scala +++ b/app/server/src/main/scala/org/bitcoins/server/Main.scala @@ -43,11 +43,6 @@ object Main extends App { implicit val system = ActorSystem("bitcoin-s") import system.dispatcher - sys.addShutdownHook { - logger.error(s"Exiting process") - system.terminate().foreach(_ => logger.info(s"Actor system terminated")) - } - /** Log the given message, shut down the actor system and quit. */ def error(message: Any): Nothing = { logger.error(s"FATAL: $message") @@ -128,7 +123,17 @@ object Main extends App { Seq(walletRoutes, nodeRoutes, chainRoutes)) server.start() } - } yield start + } yield { + + sys.addShutdownHook { + logger.error(s"Exiting process") + + node.stop().foreach(_ => logger.info(s"Stopped SPV node")) + system.terminate().foreach(_ => logger.info(s"Actor system terminated")) + } + + start + } startFut.failed.foreach { err => logger.info(s"Error on server startup!", err) diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala index 7585d4b447..cfbc0b7d00 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala @@ -63,6 +63,8 @@ object Blockchain extends ChainVerificationLogger { def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)( implicit ec: ExecutionContext, conf: ChainAppConfig): Future[BlockchainUpdate] = { + logger.debug( + s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") //get all competing chains we have val blockchainsF: Future[Vector[Blockchain]] = @@ -76,8 +78,8 @@ object Blockchain extends ChainVerificationLogger { blockchain.find(_.hashBE == header.previousBlockHashBE) prevBlockHeaderOpt match { case None => - logger.debug( - s"No common ancestor found in the chain to connect to ${header.hashBE}") + logger.warn( + s"No common ancestor found in the chain to connect header=${header.hashBE.hex}") val err = TipUpdateResult.BadPreviousBlockHash(header) val failed = BlockchainUpdate.Failed(blockchain = blockchain, failedHeader = header, @@ -86,8 +88,8 @@ object Blockchain extends ChainVerificationLogger { case Some(prevBlockHeader) => //found a header to connect to! - logger.debug( - s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") + logger.trace( + s"Found ancestor=${prevBlockHeader.hashBE.hex} for header=${header.hashBE.hex}") val tipResultF = TipValidation.checkNewTip(newPotentialTip = header, currentTip = prevBlockHeader, @@ -96,10 +98,14 @@ object Blockchain extends ChainVerificationLogger { tipResultF.map { tipResult => tipResult match { case TipUpdateResult.Success(headerDb) => + logger.debug( + s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain") val newChain = Blockchain.fromHeaders(headerDb +: blockchain.headers) BlockchainUpdate.Successful(newChain, headerDb) case fail: TipUpdateResult.Failure => + logger.warn( + s"Could not verify header=${header.hashBE.hex}, reason=$fail") BlockchainUpdate.Failed(blockchain, header, fail) } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index cb8a966d64..e6cf02d48e 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -8,6 +8,11 @@ import org.bitcoins.core.protocol.blockchain.BlockHeader import scala.concurrent.{ExecutionContext, Future} import org.bitcoins.db.ChainVerificationLogger +import org.bitcoins.chain.validation.TipUpdateResult.BadNonce +import org.bitcoins.chain.validation.TipUpdateResult.BadPOW +import org.bitcoins.chain.validation.TipUpdateResult.BadPreviousBlockHash +import org.bitcoins.core.util.FutureUtil +import org.bitcoins.chain.validation.TipUpdateResult /** * Chain Handler is meant to be the reference implementation @@ -41,6 +46,8 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( override def processHeader(header: BlockHeader)( implicit ec: ExecutionContext): Future[ChainHandler] = { + logger.debug( + s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}") val blockchainUpdateF = Blockchain.connectTip(header, blockHeaderDAO) @@ -52,14 +59,17 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( val createdF = blockHeaderDAO.create(updatedHeader) createdF.map { header => logger.debug( - s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}") + s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE.hex}") ChainHandler(blockHeaderDAO) } case BlockchainUpdate.Failed(_, _, reason) => val errMsg = s"Failed to add header to chain, header=${header.hashBE.hex} reason=${reason}" logger.warn(errMsg) - Future.failed(new RuntimeException(errMsg)) + // potential chain split happening, let's log what's going on + logTipConnectionFailure(reason).flatMap { _ => + Future.failed(new RuntimeException(errMsg)) + } } blockchainUpdateF.failed.foreach { err => @@ -71,6 +81,32 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( newHandlerF } + /** Logs a tip connection failure by querying local chain state + * and comparing it to the received `TipUpdateResult` + */ + private def logTipConnectionFailure(failure: TipUpdateResult.Failure)( + implicit ec: ExecutionContext): Future[Unit] = { + failure match { + case _ @(_: BadPOW | _: BadNonce) => + // TODO: Log this in a meaningful way + FutureUtil.unit + case _: BadPreviousBlockHash => + blockHeaderDAO.chainTips.map { tips => + if (tips.length > 1) { + logger.warn { + s"We have multiple (${tips.length}) , competing chainTips=${tips + .map(_.hashBE.hex) + .mkString("[", ",", "]")}" + } + } else { + logger.warn( + s"We don't have competing chainTips. Most recent, valid header=${tips.head.hashBE.hex}") + } + } + } + + } + /** * @inheritdoc */ diff --git a/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala b/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala index 167e14ee38..a2a16b7599 100644 --- a/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala +++ b/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala @@ -18,7 +18,10 @@ object TipUpdateResult { } /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.previousBlockHashBE previousBlockHashBE]] was incorrect */ - case class BadPreviousBlockHash(header: BlockHeader) extends Failure + case class BadPreviousBlockHash(header: BlockHeader) extends Failure { + override def toString: String = + s"BadPreviousBlockHash(hash=${header.hashBE}, previous=${header.previousBlockHashBE})" + } /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nBits nBits]] was invalid */ case class BadPOW(header: BlockHeader) extends Failure diff --git a/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala b/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala index b16dc1ca5e..1bb0ab77fd 100644 --- a/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala +++ b/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala @@ -8,40 +8,21 @@ import scodec.bits.ByteVector /** * These are used as unique identifiers inside the peer-to-peer network + * + * @param typeIdentifier The type of object which was hashed + * @param hash SHA256(SHA256()) hash of the object in internal byte order. + * * @see [[https://bitcoin.org/en/developer-reference#term-inventory]] */ -trait Inventory extends NetworkElement { - - /** - * The type of object which was hashed - * @return - */ - def typeIdentifier: TypeIdentifier - - /** - * SHA256(SHA256()) hash of the object in internal byte order. - * @return - */ - def hash: DoubleSha256Digest +case class Inventory(typeIdentifier: TypeIdentifier, hash: DoubleSha256Digest) + extends NetworkElement { override def bytes: ByteVector = RawInventorySerializer.write(this) - - override def toString(): String = s"Inventory($typeIdentifier, $hash)" } object Inventory extends Factory[Inventory] { - private case class InventoryImpl( - typeIdentifier: TypeIdentifier, - hash: DoubleSha256Digest) - extends Inventory - override def fromBytes(bytes: ByteVector): Inventory = RawInventorySerializer.read(bytes) - def apply( - typeIdentifier: TypeIdentifier, - hash: DoubleSha256Digest): Inventory = { - InventoryImpl(typeIdentifier, hash) - } } diff --git a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala index ba5a95b7db..938afcd4e7 100644 --- a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala +++ b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala @@ -17,6 +17,7 @@ import org.bitcoins.core.config.NetworkParameters import java.net.InetSocketAddress import org.bitcoins.core.number.UInt32 import org.bitcoins.core.bloom.BloomFlag +import org.bitcoins.core.crypto.HashDigest /** * Trait that represents a payload for a message on the Bitcoin p2p network @@ -43,29 +44,21 @@ sealed trait DataPayload extends NetworkPayload /** * The block message transmits a single serialized block * + * @param block The block being transmitted inside of this message + * * @see [[https://bitcoin.org/en/developer-reference#block]] */ -trait BlockMessage extends DataPayload { - - /** - * The block being transmitted inside of this [[BlockMessage]] - */ - def block: Block - - override def commandName = NetworkPayload.blockCommandName +case class BlockMessage(block: Block) extends DataPayload { + override val commandName = NetworkPayload.blockCommandName override def bytes: ByteVector = RawBlockMessageSerializer.write(this) } object BlockMessage extends Factory[BlockMessage] { - private case class BlockMessageImpl(block: Block) extends BlockMessage - def fromBytes(bytes: ByteVector): BlockMessage = RawBlockMessageSerializer.read(bytes) - def apply(block: Block): BlockMessage = BlockMessageImpl(block) - } /** @@ -147,41 +140,39 @@ object GetBlocksMessage extends Factory[GetBlocksMessage] { * The getdata message requests one or more data objects from another node. * The objects are requested by an inventory, * which the requesting node typically previously received by way of an inv message. + * + * @param inventoryCount The number of inventory enteries + * @param inventories One or more inventory entries up to a maximum of 50,000 entries. + * * @see [[https://bitcoin.org/en/developer-reference#getdata]] */ -trait GetDataMessage extends DataPayload { - - /** - * The number of inventory enteries - */ - def inventoryCount: CompactSizeUInt - - /** - * One or more inventory entries up to a maximum of 50,000 entries. - */ - def inventories: Seq[Inventory] - +case class GetDataMessage( + inventoryCount: CompactSizeUInt, + inventories: Seq[Inventory]) + extends DataPayload { override def commandName = NetworkPayload.getDataCommandName override def bytes: ByteVector = RawGetDataMessageSerializer.write(this) + + override def toString(): String = { + + val count = s"inventoryCount=${inventoryCount.toInt}" + val invs = s"inventories=${ + val base = inventories.toString + val cutoff = 100 + if (base.length() > cutoff) base.take(cutoff) + "..." + else base + }" + s"GetDataMessage($count, $invs)" + } } object GetDataMessage extends Factory[GetDataMessage] { - private case class GetDataMessageImpl( - inventoryCount: CompactSizeUInt, - inventories: Seq[Inventory]) - extends GetDataMessage override def fromBytes(bytes: ByteVector): GetDataMessage = { RawGetDataMessageSerializer.read(bytes) } - def apply( - inventoryCount: CompactSizeUInt, - inventories: Seq[Inventory]): GetDataMessage = { - GetDataMessageImpl(inventoryCount, inventories) - } - def apply(inventories: Seq[Inventory]): GetDataMessage = { val inventoryCount = CompactSizeUInt(UInt64(inventories.length)) GetDataMessage(inventoryCount, inventories) @@ -294,9 +285,17 @@ case class HeadersMessage(count: CompactSizeUInt, headers: Vector[BlockHeader]) object HeadersMessage extends Factory[HeadersMessage] { + /** The maximum amount of headers sent in one `headers` message + * + * @see [[https://bitcoin.org/en/developer-reference#getheaders bitcoin.org]] + * developer reference + */ + val MaxHeadersCount: Int = 2000 + def fromBytes(bytes: ByteVector): HeadersMessage = RawHeadersMessageSerializer.read(bytes) + /** Constructs a `headers` message from the given headers */ def apply(headers: Vector[BlockHeader]): HeadersMessage = { val count = CompactSizeUInt(UInt64(headers.length)) HeadersMessage(count, headers) @@ -384,11 +383,11 @@ case object MemPoolMessage extends DataPayload { * * @see [[https://bitcoin.org/en/developer-reference#merkleblock]] * - * @param merkleBlock The actual [[org.bitcoins.core.protocol.blockchain.MerkleBlock MerkleBlock]] that this message represents + * @param merkleBlock The actual [[org.bitcoins.core.protocol.blockchain.MerkleBlock MerkleBlock]] that this message represents */ case class MerkleBlockMessage(merkleBlock: MerkleBlock) extends DataPayload { - override def commandName = NetworkPayload.merkleBlockCommandName + override val commandName = NetworkPayload.merkleBlockCommandName def bytes: ByteVector = RawMerkleBlockMessageSerializer.write(this) @@ -447,16 +446,12 @@ object NotFoundMessage extends Factory[NotFoundMessage] { /** * The tx message transmits a single transaction in the raw transaction format. * It can be sent in a variety of situations; + * @param transaction The transaction being sent over the wire * @see [[https://bitcoin.org/en/developer-reference#tx]] */ -trait TransactionMessage extends DataPayload { +case class TransactionMessage(transaction: Transaction) extends DataPayload { - /** - * The transaction being sent over the wire - */ - def transaction: Transaction - - override def commandName = NetworkPayload.transactionCommandName + override val commandName = NetworkPayload.transactionCommandName override def bytes: ByteVector = RawTransactionMessageSerializer.write(this) override def toString(): String = s"TransactionMessage(${transaction.txIdBE})" @@ -468,14 +463,8 @@ trait TransactionMessage extends DataPayload { */ object TransactionMessage extends Factory[TransactionMessage] { - private case class TransactionMessageImpl(transaction: Transaction) - extends TransactionMessage - def fromBytes(bytes: ByteVector): TransactionMessage = RawTransactionMessageSerializer.read(bytes) - - def apply(transaction: Transaction): TransactionMessage = - TransactionMessageImpl(transaction) } /** @@ -617,6 +606,12 @@ object FilterAddMessage extends Factory[FilterAddMessage] { element: ByteVector): FilterAddMessage = { FilterAddMessageImpl(elementSize, element) } + + /** Constructs a `FilterAddMessage` from the given hash digest */ + def fromHash(hash: HashDigest): FilterAddMessage = { + FilterAddMessageImpl(CompactSizeUInt(UInt64(hash.bytes.length)), hash.bytes) + } + } /** diff --git a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala new file mode 100644 index 0000000000..c8fb0b4bc6 --- /dev/null +++ b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala @@ -0,0 +1,189 @@ +package org.bitcoins.node + +import org.bitcoins.testkit.wallet.BitcoinSWalletTest +import org.scalatest.FutureOutcome +import org.bitcoins.node.models.Peer +import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.testkit.node.NodeTestUtil +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.node.networking.peer.DataMessageHandler +import org.bitcoins.core.protocol.transaction.Transaction +import scala.concurrent._ +import scala.concurrent.duration._ +import org.scalatest.compatible.Assertion +import org.bitcoins.core.currency._ +import scala.util.Try +import akka.actor.Cancellable +import org.scalatest.run +import org.scalatest.exceptions.TestFailedException +import org.bitcoins.core.wallet.fee.SatoshisPerByte + +class UpdateBloomFilterTest extends BitcoinSWalletTest { + override type FixtureParam = WalletWithBitcoind + + def withFixture(test: OneArgAsyncTest): FutureOutcome = + withFundedWalletAndBitcoind(test) + + it must "update the bloom filter with an address" in { param => + val WalletWithBitcoind(wallet, rpc) = param + implicit val chainConf: ChainAppConfig = config + implicit val nodeConf: NodeAppConfig = config + + val assertionP = Promise[Assertion] + val assertionF = assertionP.future + + // we want to schedule a runnable that aborts + // the test after a timeout, but then + // we need to cancel that runnable once + // we get a result + var cancelable: Option[Cancellable] = None + val timeout = 15.seconds + + for { + _ <- config.initialize() + + firstBloom <- wallet.getBloomFilter() + + // this has to be generated after our bloom filter + // is calculated + addressFromWallet <- wallet.getNewAddress() + + spv <- { + val callback = SpvNodeCallbacks.onTxReceived { tx => + rpc.getRawTransaction(tx.txIdBE).foreach { res => + val paysToOurAddress = + // we check if any of the addresses in the TX + // pays to our wallet address + res.vout.exists(_.scriptPubKey.addresses match { + case None => false + case Some(addresses) => addresses.exists(_ == addressFromWallet) + }) + cancelable.forall(_.cancel()) + assertionP.complete { + Try { + assert(paysToOurAddress) + } + } + } + } + + val peer = Peer.fromBitcoind(rpc.instance) + val chain = { + val dao = BlockHeaderDAO() + ChainHandler(dao) + } + val spv = + SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback) + spv.start() + } + _ <- spv.sync() + _ <- NodeTestUtil.awaitSync(spv, rpc) + + _ = spv.updateBloomFilter(addressFromWallet) + _ = { + val runnable = new Runnable { + override def run: Unit = { + assertionP.failure( + new TestFailedException( + s"Did not receive a TX message after $timeout!", + failedCodeStackDepth = 0)) + } + } + cancelable = Some { + actorSystem.scheduler.scheduleOnce(timeout, runnable) + } + } + _ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin) + assertion <- assertionF + } yield assertion + } + + it must "update the bloom filter with a TX" in { param => + val WalletWithBitcoind(wallet, rpc) = param + implicit val chainConf: ChainAppConfig = config + implicit val nodeConf: NodeAppConfig = config + + val assertionP = Promise[Assertion] + val assertionF = assertionP.future + + // we want to schedule a runnable that aborts + // the test after a timeout, but then + // we need to cancel that runnable once + // we get a result + var cancelable: Option[Cancellable] = None + + // the TX we sent from our wallet to bitcoind, + // we expect to get notified once this is + // confirmed + var txFromWallet: Option[Transaction] = None + val timeout = 15.seconds + + for { + _ <- config.initialize() + + firstBloom <- wallet.getBloomFilter() + + spv <- { + val callback = SpvNodeCallbacks.onMerkleBlockReceived { (block, txs) => + val isFromOurWallet = txFromWallet.exists(tx => txs.contains(tx)) + // we might receive more merkle blocks than just the + // one for our TX + if (isFromOurWallet) { + assertionP.success(assert(isFromOurWallet)) + } + } + + val peer = Peer.fromBitcoind(rpc.instance) + val chain = { + val dao = BlockHeaderDAO() + ChainHandler(dao) + } + val spv = + SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback) + spv.start() + } + _ <- spv.sync() + _ <- NodeTestUtil.awaitSync(spv, rpc) + + addressFromBitcoind <- rpc.getNewAddress + tx <- wallet + .sendToAddress(addressFromBitcoind, + 5.bitcoin, + SatoshisPerByte(100.sats)) + .map { tx => + txFromWallet = Some(tx) + tx + } + + _ = { + val _ = spv.broadcastTransaction(tx) + val SpvNode(_, _, newBloom, _) = spv.updateBloomFilter(tx) + assert(newBloom.contains(tx.txId)) + + cancelable = Some { + actorSystem.scheduler.scheduleOnce( + timeout, + new Runnable { + override def run: Unit = { + if (!assertionP.isCompleted) + assertionP.failure( + new TestFailedException( + s"Did not receive a merkle block message after $timeout!", + failedCodeStackDepth = 0)) + } + } + ) + } + } + // this should confirm our TX + // since we updated the bloom filter + // we should get notified about the block + _ <- rpc.getNewAddress.flatMap(rpc.generateToAddress(1, _)) + + assertion <- assertionF + } yield assertion + + } +} diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala new file mode 100644 index 0000000000..87e4064713 --- /dev/null +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala @@ -0,0 +1,82 @@ +package org.bitcoins.node.networking.peer + +import org.bitcoins.testkit.util.BitcoinSUnitTest +import org.bitcoins.testkit.Implicits._ +import org.bitcoins.core.protocol.blockchain.MerkleBlock +import org.bitcoins.testkit.core.gen.BlockchainElementsGenerator +import org.bitcoins.testkit.core.gen.TransactionGenerators +import org.scalacheck.Gen +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.protocol.blockchain.Block +import _root_.org.scalatest.compatible.Assertion +import scala.concurrent.Future +import scala.util.Success +import scala.util.Try +import scala.util.Failure + +class MerkleBuffersTest extends BitcoinSUnitTest { + behavior of "MerkleBuffers" + + /** Generating blocks and transactions take a little while, + * this is to prevent the test from taking a _really_ long + * time + */ + implicit override val generatorDrivenConfig: PropertyCheckConfiguration = + customGenDrivenConfig(executions = 3) + + it must "match a merkle block with its corresponding transactions" in { + + val txsAndBlockGen: Gen[(Seq[Transaction], Seq[Transaction], Block)] = for { + txs <- Gen.nonEmptyListOf(TransactionGenerators.transaction) + otherTxs <- Gen.nonEmptyListOf(TransactionGenerators.transaction) + block <- BlockchainElementsGenerator.block(txs) + } yield (txs, otherTxs, block) + + forAll(txsAndBlockGen) { + + case (txs, otherTxs, block) => + var receivedExpectedTXs: Option[Try[Assertion]] = None + var callbackCount: Int = 0 + val callback: DataMessageHandler.OnMerkleBlockReceived = { + (_, merkleTxs) => + receivedExpectedTXs = Some( + Try( + assert(txs == merkleTxs, + "Received TXs in callback was not the ones we put in"))) + callbackCount = callbackCount + 1 + } + + val merkle = MerkleBlock(block, txs.map(_.txId)) + val _ = MerkleBuffers.putMerkle(merkle) + + txs.map { tx => + val matches = MerkleBuffers.putTx(tx, Seq(callback)) + assert( + matches, + s"TX ${tx.txIdBE} did not match any merkle block in MerkleBuffers") + } + + otherTxs.map { tx => + val matches = MerkleBuffers.putTx(tx, Seq(callback)) + assert( + !matches, + s"Unrelated TX ${tx.txIdBE} did match merkle block in MerkleBuffers") + + } + + assert(callbackCount != 0, + "Callback was not called after processing all TXs!") + + assert(callbackCount == 1, + s"Callback was called multiple times: $callbackCount") + + receivedExpectedTXs match { + case None => fail("Callback was never called") + case Some(Success(assertion)) => assertion + case Some(Failure(exc)) => fail(exc) + } + + } + + } +} diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index 3816b6b860..9f288ec93f 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -12,7 +12,6 @@ import org.bitcoins.rpc.util.AsyncUtil import scala.concurrent.Future import org.bitcoins.core.bloom.BloomFilter -import org.bitcoins.core.p2p.FilterLoadMessage import org.bitcoins.core.p2p.NetworkPayload import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.node.models.BroadcastAbleTransaction @@ -22,6 +21,7 @@ import scala.util.Failure import scala.util.Success import org.bitcoins.db.P2PLogger import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.core.protocol.BitcoinAddress case class SpvNode( peer: Peer, @@ -44,6 +44,36 @@ case class SpvNode( PeerMessageSender(client) } + /** Updates our bloom filter to match the given TX + * + * @return SPV node with the updated bloom filter + */ + def updateBloomFilter(transaction: Transaction): SpvNode = { + logger.info(s"Updating bloom filter with transaction=${transaction.txIdBE}") + val newBloom = bloomFilter.update(transaction) + + // we could send filteradd messages, but we would + // then need to calculate all the new elements in + // the filter. this is easier:-) + peerMsgSender.sendFilterClearMessage() + peerMsgSender.sendFilterLoadMessage(newBloom) + + copy(bloomFilter = newBloom) + } + + /** Updates our bloom filter to match the given address + * + * @return SPV node with the updated bloom filter + */ + def updateBloomFilter(address: BitcoinAddress): SpvNode = { + logger.info(s"Updating bloom filter with address=$address") + val hash = address.hash + val newBloom = bloomFilter.insert(hash) + peerMsgSender.sendFilterAddMessage(hash) + + copy(bloomFilter = newBloom) + } + /** * Sends the given P2P to our peer. * This method is useful for playing around @@ -74,8 +104,7 @@ case class SpvNode( } } yield { logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") - val filterMsg = FilterLoadMessage(bloomFilter) - val _ = send(filterMsg) + val _ = peerMsgSender.sendFilterLoadMessage(bloomFilter) node } } diff --git a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala index adc7150c82..3b917e0e55 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala @@ -287,10 +287,10 @@ object P2PClient extends P2PLogger { remainingBytes.length) loop(newRemainingBytes, message :: accum) } - case Failure(exception) => + case Failure(exc) => logger.error( - "Failed to parse network message, could be because TCP frame isn't aligned", - exception) + s"Failed to parse network message, could be because TCP frame isn't aligned: $exc") + //this case means that our TCP frame was not aligned with bitcoin protocol //return the unaligned bytes so we can apply them to the next tcp frame of bytes we receive //http://stackoverflow.com/a/37979529/967713 diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 1c807db7b2..30a5163cff 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -19,6 +19,7 @@ import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.core.p2p.TypeIdentifier import org.bitcoins.core.p2p.MsgUnassigned import org.bitcoins.db.P2PLogger +import org.bitcoins.core.p2p.Inventory /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] * that a peer to sent to us on the p2p network, for instance, if we a receive a @@ -29,9 +30,6 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( appConfig: NodeAppConfig) extends P2PLogger { - private val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length - logger.debug(s"Given $callbackNum of callback(s)") - private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile) def handleDataPayload( @@ -68,29 +66,61 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( } FutureUtil.unit - case headersMsg: HeadersMessage => + case HeadersMessage(count, headers) => + logger.debug(s"Received headers message with ${count.toInt} headers") logger.trace( - s"Received headers message with ${headersMsg.count.toInt} headers") - val headers = headersMsg.headers + s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}") val chainApiF = chainHandler.processHeaders(headers) - chainApiF.map { newApi => - val lastHeader = headers.last - val lastHash = lastHeader.hash - newApi.getBlockCount.map { count => - logger.trace( - s"Processed headers, most recent has height=$count and hash=$lastHash.") + logger.trace(s"Requesting data for headers=${headers.length}") + peerMsgSender.sendGetDataMessage(headers: _*) + + chainApiF + .map { newApi => + if (headers.nonEmpty) { + + val lastHeader = headers.last + val lastHash = lastHeader.hash + newApi.getBlockCount.map { count => + logger.trace( + s"Processed headers, most recent has height=$count and hash=$lastHash.") + } + + if (count.toInt == HeadersMessage.MaxHeadersCount) { + logger.error( + s"Received maximum amount of headers in one header message. This means we are not synced, requesting more") + peerMsgSender.sendGetHeadersMessage(lastHash) + } else { + logger.debug( + List(s"Received headers=${count.toInt} in one message,", + "which is less than max. This means we are synced,", + "not requesting more.") + .mkString(" ")) + } + + } + } + .failed + .map { err => + logger.error(s"Error when processing headers message", err) } - peerMsgSender.sendGetHeadersMessage(lastHash) - } case msg: BlockMessage => Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) } - case msg: TransactionMessage => - Future { callbacks.onTxReceived.foreach(_.apply(msg.transaction)) } - case msg: MerkleBlockMessage => - Future { - callbacks.onMerkleBlockReceived.foreach(_.apply(msg.merkleBlock)) + case TransactionMessage(tx) => + val belongsToMerkle = + MerkleBuffers.putTx(tx, callbacks.onMerkleBlockReceived) + if (belongsToMerkle) { + logger.trace( + s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks") + FutureUtil.unit + } else { + logger.trace( + s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks") + Future { callbacks.onTxReceived.foreach(_.apply(tx)) } } + case MerkleBlockMessage(merkleBlock) => + MerkleBuffers.putMerkle(merkleBlock) + FutureUtil.unit case invMsg: InventoryMessage => handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender) } @@ -100,7 +130,11 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( invMsg: InventoryMessage, peerMsgSender: PeerMessageSender): Future[Unit] = { logger.info(s"Received inv=${invMsg}") - val getData = GetDataMessage(invMsg.inventories) + val getData = GetDataMessage(invMsg.inventories.map { + case Inventory(TypeIdentifier.MsgBlock, hash) => + Inventory(TypeIdentifier.MsgFilteredBlock, hash) + case other: Inventory => other + }) peerMsgSender.sendMsg(getData) FutureUtil.unit @@ -112,8 +146,8 @@ object DataMessageHandler { /** Callback for handling a received block */ type OnBlockReceived = Block => Unit - /** Callback for handling a received Merkle block */ - type OnMerkleBlockReceived = MerkleBlock => Unit + /** Callback for handling a received Merkle block with its corresponding TXs */ + type OnMerkleBlockReceived = (MerkleBlock, Vector[Transaction]) => Unit /** Callback for handling a received transaction */ type OnTxReceived = Transaction => Unit diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala new file mode 100644 index 0000000000..1f8a58b323 --- /dev/null +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala @@ -0,0 +1,115 @@ +package org.bitcoins.node.networking.peer + +import org.bitcoins.core.util.BitcoinSLogger +import scala.collection.mutable +import org.bitcoins.core.protocol.blockchain.MerkleBlock +import org.bitcoins.core.protocol.transaction.Transaction + +/** + * A buffer of merkleblocks and the transactions associated with them. + * + * When receiving a merkleblock message over the P2P network, the + * corresponding transactions are sent immediately after. That means + * we have to correlate the received merkleblocks with the matching + * transactions. + * + * This buffer is responsible for calling the approriate callbacks + * once a merkle block has received all its transactions. + */ +private[peer] object MerkleBuffers extends BitcoinSLogger { + private type MerkleBlocksWithTransactions = + mutable.Map[MerkleBlock, mutable.Builder[Transaction, Vector[Transaction]]] + + private val underlyingMap: MerkleBlocksWithTransactions = mutable.Map.empty + + /** Adds the given merkleblock to the buffer */ + def putMerkle(merkle: MerkleBlock): Unit = { + val tree = merkle.partialMerkleTree + val matches = tree.extractMatches + + logger.debug(s"Received merkle block, expecting ${matches.length} TX(s)") + + if (matches.nonEmpty) { + logger.trace(s"Adding merkleBlock=${merkle.blockHeader.hashBE} to buffer") + underlyingMap.put(merkle, + // it's important to use a collection + // type that can call .result() without + // clearing the builder + Vector.newBuilder) + } else { + logger.trace( + s"Merkleblock=${merkle.blockHeader.hashBE} has no matches, not adding to buffer") + } + + () + } + + /** Attempts to add the given transaction to a corresponding + * merkleblock in the buffer. + * + * @param tx The transaction to (maybe) add to the buffer + * @param callbacks The callbacks to execute if we're + * finished processing a merkleblock + * + * @return If the transaction matches a merkle block, returns true. + * Otherwise, false. + */ + def putTx( + tx: Transaction, + callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]): Boolean = { + val blocksInBuffer = underlyingMap.keys.toList + logger.trace(s"Looking for transaction=${tx.txIdBE} in merkleblock buffer") + logger.trace(s"Merkleblocks in buffer: ${blocksInBuffer.length}") + blocksInBuffer.find { block => + val matches = block.partialMerkleTree.extractMatches + + logger.trace( + s"Block=${block.blockHeader.hashBE} has matches=${matches.map(_.flip)}") + + matches.exists(_ == tx.txId) + } match { + case None => + logger.debug( + s"Transaction=${tx.txIdBE} does not belong to any merkle block") + false + case Some(key) => + handleMerkleMatch(tx, merkleBlock = key, callbacks = callbacks) + } + } + + // TODO Scaladoc + private def handleMerkleMatch( + transaction: Transaction, + merkleBlock: MerkleBlock, + callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]) = { + val merkleBlockMatches = merkleBlock.partialMerkleTree.extractMatches + val merkleHash = merkleBlock.blockHeader.hashBE + + val txHash = transaction.txIdBE + + logger.debug(s"Transaction=$txHash matched merkleBlock=$merkleHash") + + logger.trace(s"Adding transaction=$txHash to buffer") + val builder = underlyingMap(merkleBlock) // TODO: error handling + builder += transaction + + val transactionSoFar = builder.result() + val transactionSoFarCount = transactionSoFar.length + val matchesCount = merkleBlockMatches.length + if (transactionSoFarCount == matchesCount) { + logger.debug( + s"We've received all transactions ($transactionSoFarCount) for merkleBlock=$merkleHash") + + logger.trace(s"Removing merkle block from buffer") + underlyingMap.remove(merkleBlock) // TODO: error handling + + logger.trace(s"Calling merkle block callback(s)") + callbacks.foreach(_.apply(merkleBlock, transactionSoFar)) + } else { + logger.trace( + s"We've received $transactionSoFarCount, expecting $matchesCount") + assert(transactionSoFarCount < matchesCount) + } + true + } +} diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala index 3916166d56..80ea4aca72 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala @@ -9,6 +9,9 @@ import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.db.P2PLogger +import org.bitcoins.core.crypto.HashDigest +import org.bitcoins.core.bloom.BloomFilter +import org.bitcoins.core.protocol.blockchain.BlockHeader case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig) extends P2PLogger { @@ -67,12 +70,38 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig) sendMsg(message) } + def sendFilterClearMessage(): Unit = { + sendMsg(FilterClearMessage) + } + + def sendFilterAddMessage(hash: HashDigest): Unit = { + val message = FilterAddMessage.fromHash(hash) + logger.trace(s"Sending filteradd=$message to peer=${client.peer}") + sendMsg(message) + } + + def sendFilterLoadMessage(bloom: BloomFilter): Unit = { + val message = FilterLoadMessage(bloom) + logger.trace(s"Sending filterload=$message to peer=${client.peer}") + sendMsg(message) + } + def sendTransactionMessage(transaction: Transaction): Unit = { val message = TransactionMessage(transaction) logger.trace(s"Sending txmessage=$message to peer=${client.peer}") sendMsg(message) } + /** Sends a request for filtered blocks matching the given headers */ + def sendGetDataMessage(headers: BlockHeader*): Unit = { + val inventories = + headers.map(header => + Inventory(TypeIdentifier.MsgFilteredBlock, header.hash)) + val message = GetDataMessage(inventories) + logger.info(s"Sending getdata=$message to peer=${client.peer}") + sendMsg(message) + } + private[node] def sendMsg(msg: NetworkPayload): Unit = { logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}") val newtworkMsg = NetworkMessage(conf.network, msg) diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala index cfd8294ca4..88fdcaaa88 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala @@ -19,7 +19,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor.ActorSystem import org.bitcoins.core.util.BitcoinSLogger -import org.bitcoins.rpc.util.AsyncUtil +import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.core.bloom.BloomFilter import org.bitcoins.core.bloom.BloomUpdateAll @@ -116,12 +116,30 @@ abstract class NodeTestUtil extends BitcoinSLogger { } } - /** Awaits sync between the given SPV node and bitcoind client */ + /** Checks if the given light client and bitcoind + * has the same number of blocks in their blockchains + */ + def isSameBlockCount(spv: SpvNode, rpc: BitcoindRpcClient)( + implicit ec: ExecutionContext): Future[Boolean] = { + val rpcCountF = rpc.getBlockCount + val spvCountF = spv.chainApi.getBlockCount + for { + spvCount <- spvCountF + rpcCount <- rpcCountF + } yield rpcCount == spvCount + } + + /** Awaits sync between the given SPV node and bitcoind client + * + * TODO: We should check for hash, not block height. however, + * our way of determining what the best hash is when having + * multiple tips is not good enough yet + */ def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)( implicit sys: ActorSystem): Future[Unit] = { import sys.dispatcher - AsyncUtil - .retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 500.milliseconds) + TestAsyncUtil + .retryUntilSatisfiedF(() => isSameBlockCount(node, rpc), 500.milliseconds) } } diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala index 08d99cb85d..2292eff1c5 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala @@ -15,7 +15,7 @@ class WalletBloomTest extends BitcoinSWalletTest { override type FixtureParam = WalletWithBitcoind override def withFixture(test: OneArgAsyncTest): FutureOutcome = - withNewWalletAndBitcoind(test) + withFundedWalletAndBitcoind(test) it should "generate a bloom filter that matches the pubkeys in our wallet" in { param => @@ -32,19 +32,12 @@ class WalletBloomTest extends BitcoinSWalletTest { } } - // TODO: change fixture to withFundedWalletAndBitcoind once #577 goes in - // https://github.com/bitcoin-s/bitcoin-s/pull/577/files#diff-0fb6ac004fe1e550b7c13258d7d0706cR154 it should "generate a bloom filter that matches the outpoints in our wallet" in { param => val WalletWithBitcoind(walletApi, bitcoind) = param val wallet = walletApi.asInstanceOf[Wallet] for { - address <- wallet.getNewAddress() - tx <- bitcoind - .sendToAddress(address, 5.bitcoins) - .flatMap(bitcoind.getRawTransaction(_)) - _ <- wallet.processTransaction(tx.hex, confirmations = 0) outpoints <- wallet.listOutpoints() bloom <- wallet.getBloomFilter() diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala index 5a422bf803..52339befb3 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala @@ -28,7 +28,7 @@ private[wallet] trait TransactionProcessing extends KeyHandlingLogger { logger.info( s"Processing transaction=${transaction.txIdBE} with confirmations=$confirmations") processTransactionImpl(transaction, confirmations).map { - case ProcessTxResult(outgoing, incoming) => + case ProcessTxResult(incoming, outgoing) => logger.info( s"Finished processing of transaction=${transaction.txIdBE}. Relevant incomingTXOs=${incoming.length}, outgoingTXOs=${outgoing.length}") this