diff --git a/app/server/src/main/scala/org/bitcoins/server/Main.scala b/app/server/src/main/scala/org/bitcoins/server/Main.scala index e7534ae365..ae8f78bf46 100644 --- a/app/server/src/main/scala/org/bitcoins/server/Main.scala +++ b/app/server/src/main/scala/org/bitcoins/server/Main.scala @@ -43,11 +43,6 @@ object Main extends App { implicit val system = ActorSystem("bitcoin-s") import system.dispatcher - sys.addShutdownHook { - logger.error(s"Exiting process") - system.terminate().foreach(_ => logger.info(s"Actor system terminated")) - } - /** Log the given message, shut down the actor system and quit. */ def error(message: Any): Nothing = { logger.error(s"FATAL: $message") @@ -128,7 +123,17 @@ object Main extends App { Seq(walletRoutes, nodeRoutes, chainRoutes)) server.start() } - } yield start + } yield { + + sys.addShutdownHook { + logger.error(s"Exiting process") + + node.stop().foreach(_ => logger.info(s"Stopped SPV node")) + system.terminate().foreach(_ => logger.info(s"Actor system terminated")) + } + + start + } startFut.failed.foreach { err => logger.info(s"Error on server startup!", err) diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala index 7585d4b447..cfbc0b7d00 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala @@ -63,6 +63,8 @@ object Blockchain extends ChainVerificationLogger { def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)( implicit ec: ExecutionContext, conf: ChainAppConfig): Future[BlockchainUpdate] = { + logger.debug( + s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") //get all competing chains we have val blockchainsF: Future[Vector[Blockchain]] = @@ -76,8 +78,8 @@ object Blockchain extends ChainVerificationLogger { blockchain.find(_.hashBE == header.previousBlockHashBE) prevBlockHeaderOpt match { case None => - logger.debug( - s"No common ancestor found in the chain to connect to ${header.hashBE}") + logger.warn( + s"No common ancestor found in the chain to connect header=${header.hashBE.hex}") val err = TipUpdateResult.BadPreviousBlockHash(header) val failed = BlockchainUpdate.Failed(blockchain = blockchain, failedHeader = header, @@ -86,8 +88,8 @@ object Blockchain extends ChainVerificationLogger { case Some(prevBlockHeader) => //found a header to connect to! - logger.debug( - s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") + logger.trace( + s"Found ancestor=${prevBlockHeader.hashBE.hex} for header=${header.hashBE.hex}") val tipResultF = TipValidation.checkNewTip(newPotentialTip = header, currentTip = prevBlockHeader, @@ -96,10 +98,14 @@ object Blockchain extends ChainVerificationLogger { tipResultF.map { tipResult => tipResult match { case TipUpdateResult.Success(headerDb) => + logger.debug( + s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain") val newChain = Blockchain.fromHeaders(headerDb +: blockchain.headers) BlockchainUpdate.Successful(newChain, headerDb) case fail: TipUpdateResult.Failure => + logger.warn( + s"Could not verify header=${header.hashBE.hex}, reason=$fail") BlockchainUpdate.Failed(blockchain, header, fail) } } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index cb8a966d64..e6cf02d48e 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -8,6 +8,11 @@ import org.bitcoins.core.protocol.blockchain.BlockHeader import scala.concurrent.{ExecutionContext, Future} import org.bitcoins.db.ChainVerificationLogger +import org.bitcoins.chain.validation.TipUpdateResult.BadNonce +import org.bitcoins.chain.validation.TipUpdateResult.BadPOW +import org.bitcoins.chain.validation.TipUpdateResult.BadPreviousBlockHash +import org.bitcoins.core.util.FutureUtil +import org.bitcoins.chain.validation.TipUpdateResult /** * Chain Handler is meant to be the reference implementation @@ -41,6 +46,8 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( override def processHeader(header: BlockHeader)( implicit ec: ExecutionContext): Future[ChainHandler] = { + logger.debug( + s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}") val blockchainUpdateF = Blockchain.connectTip(header, blockHeaderDAO) @@ -52,14 +59,17 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( val createdF = blockHeaderDAO.create(updatedHeader) createdF.map { header => logger.debug( - s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}") + s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE.hex}") ChainHandler(blockHeaderDAO) } case BlockchainUpdate.Failed(_, _, reason) => val errMsg = s"Failed to add header to chain, header=${header.hashBE.hex} reason=${reason}" logger.warn(errMsg) - Future.failed(new RuntimeException(errMsg)) + // potential chain split happening, let's log what's going on + logTipConnectionFailure(reason).flatMap { _ => + Future.failed(new RuntimeException(errMsg)) + } } blockchainUpdateF.failed.foreach { err => @@ -71,6 +81,32 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)( newHandlerF } + /** Logs a tip connection failure by querying local chain state + * and comparing it to the received `TipUpdateResult` + */ + private def logTipConnectionFailure(failure: TipUpdateResult.Failure)( + implicit ec: ExecutionContext): Future[Unit] = { + failure match { + case _ @(_: BadPOW | _: BadNonce) => + // TODO: Log this in a meaningful way + FutureUtil.unit + case _: BadPreviousBlockHash => + blockHeaderDAO.chainTips.map { tips => + if (tips.length > 1) { + logger.warn { + s"We have multiple (${tips.length}) , competing chainTips=${tips + .map(_.hashBE.hex) + .mkString("[", ",", "]")}" + } + } else { + logger.warn( + s"We don't have competing chainTips. Most recent, valid header=${tips.head.hashBE.hex}") + } + } + } + + } + /** * @inheritdoc */ diff --git a/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala b/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala index 167e14ee38..a2a16b7599 100644 --- a/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala +++ b/chain/src/main/scala/org/bitcoins/chain/validation/TipUpdateResult.scala @@ -18,7 +18,10 @@ object TipUpdateResult { } /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.previousBlockHashBE previousBlockHashBE]] was incorrect */ - case class BadPreviousBlockHash(header: BlockHeader) extends Failure + case class BadPreviousBlockHash(header: BlockHeader) extends Failure { + override def toString: String = + s"BadPreviousBlockHash(hash=${header.hashBE}, previous=${header.previousBlockHashBE})" + } /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nBits nBits]] was invalid */ case class BadPOW(header: BlockHeader) extends Failure diff --git a/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala b/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala index b16dc1ca5e..1bb0ab77fd 100644 --- a/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala +++ b/core/src/main/scala/org/bitcoins/core/p2p/Inventory.scala @@ -8,40 +8,21 @@ import scodec.bits.ByteVector /** * These are used as unique identifiers inside the peer-to-peer network + * + * @param typeIdentifier The type of object which was hashed + * @param hash SHA256(SHA256()) hash of the object in internal byte order. + * * @see [[https://bitcoin.org/en/developer-reference#term-inventory]] */ -trait Inventory extends NetworkElement { - - /** - * The type of object which was hashed - * @return - */ - def typeIdentifier: TypeIdentifier - - /** - * SHA256(SHA256()) hash of the object in internal byte order. - * @return - */ - def hash: DoubleSha256Digest +case class Inventory(typeIdentifier: TypeIdentifier, hash: DoubleSha256Digest) + extends NetworkElement { override def bytes: ByteVector = RawInventorySerializer.write(this) - - override def toString(): String = s"Inventory($typeIdentifier, $hash)" } object Inventory extends Factory[Inventory] { - private case class InventoryImpl( - typeIdentifier: TypeIdentifier, - hash: DoubleSha256Digest) - extends Inventory - override def fromBytes(bytes: ByteVector): Inventory = RawInventorySerializer.read(bytes) - def apply( - typeIdentifier: TypeIdentifier, - hash: DoubleSha256Digest): Inventory = { - InventoryImpl(typeIdentifier, hash) - } } diff --git a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala index ba5a95b7db..938afcd4e7 100644 --- a/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala +++ b/core/src/main/scala/org/bitcoins/core/p2p/NetworkPayload.scala @@ -17,6 +17,7 @@ import org.bitcoins.core.config.NetworkParameters import java.net.InetSocketAddress import org.bitcoins.core.number.UInt32 import org.bitcoins.core.bloom.BloomFlag +import org.bitcoins.core.crypto.HashDigest /** * Trait that represents a payload for a message on the Bitcoin p2p network @@ -43,29 +44,21 @@ sealed trait DataPayload extends NetworkPayload /** * The block message transmits a single serialized block * + * @param block The block being transmitted inside of this message + * * @see [[https://bitcoin.org/en/developer-reference#block]] */ -trait BlockMessage extends DataPayload { - - /** - * The block being transmitted inside of this [[BlockMessage]] - */ - def block: Block - - override def commandName = NetworkPayload.blockCommandName +case class BlockMessage(block: Block) extends DataPayload { + override val commandName = NetworkPayload.blockCommandName override def bytes: ByteVector = RawBlockMessageSerializer.write(this) } object BlockMessage extends Factory[BlockMessage] { - private case class BlockMessageImpl(block: Block) extends BlockMessage - def fromBytes(bytes: ByteVector): BlockMessage = RawBlockMessageSerializer.read(bytes) - def apply(block: Block): BlockMessage = BlockMessageImpl(block) - } /** @@ -147,41 +140,39 @@ object GetBlocksMessage extends Factory[GetBlocksMessage] { * The getdata message requests one or more data objects from another node. * The objects are requested by an inventory, * which the requesting node typically previously received by way of an inv message. + * + * @param inventoryCount The number of inventory enteries + * @param inventories One or more inventory entries up to a maximum of 50,000 entries. + * * @see [[https://bitcoin.org/en/developer-reference#getdata]] */ -trait GetDataMessage extends DataPayload { - - /** - * The number of inventory enteries - */ - def inventoryCount: CompactSizeUInt - - /** - * One or more inventory entries up to a maximum of 50,000 entries. - */ - def inventories: Seq[Inventory] - +case class GetDataMessage( + inventoryCount: CompactSizeUInt, + inventories: Seq[Inventory]) + extends DataPayload { override def commandName = NetworkPayload.getDataCommandName override def bytes: ByteVector = RawGetDataMessageSerializer.write(this) + + override def toString(): String = { + + val count = s"inventoryCount=${inventoryCount.toInt}" + val invs = s"inventories=${ + val base = inventories.toString + val cutoff = 100 + if (base.length() > cutoff) base.take(cutoff) + "..." + else base + }" + s"GetDataMessage($count, $invs)" + } } object GetDataMessage extends Factory[GetDataMessage] { - private case class GetDataMessageImpl( - inventoryCount: CompactSizeUInt, - inventories: Seq[Inventory]) - extends GetDataMessage override def fromBytes(bytes: ByteVector): GetDataMessage = { RawGetDataMessageSerializer.read(bytes) } - def apply( - inventoryCount: CompactSizeUInt, - inventories: Seq[Inventory]): GetDataMessage = { - GetDataMessageImpl(inventoryCount, inventories) - } - def apply(inventories: Seq[Inventory]): GetDataMessage = { val inventoryCount = CompactSizeUInt(UInt64(inventories.length)) GetDataMessage(inventoryCount, inventories) @@ -294,9 +285,17 @@ case class HeadersMessage(count: CompactSizeUInt, headers: Vector[BlockHeader]) object HeadersMessage extends Factory[HeadersMessage] { + /** The maximum amount of headers sent in one `headers` message + * + * @see [[https://bitcoin.org/en/developer-reference#getheaders bitcoin.org]] + * developer reference + */ + val MaxHeadersCount: Int = 2000 + def fromBytes(bytes: ByteVector): HeadersMessage = RawHeadersMessageSerializer.read(bytes) + /** Constructs a `headers` message from the given headers */ def apply(headers: Vector[BlockHeader]): HeadersMessage = { val count = CompactSizeUInt(UInt64(headers.length)) HeadersMessage(count, headers) @@ -384,11 +383,11 @@ case object MemPoolMessage extends DataPayload { * * @see [[https://bitcoin.org/en/developer-reference#merkleblock]] * - * @param merkleBlock The actual [[org.bitcoins.core.protocol.blockchain.MerkleBlock MerkleBlock]] that this message represents + * @param merkleBlock The actual [[org.bitcoins.core.protocol.blockchain.MerkleBlock MerkleBlock]] that this message represents */ case class MerkleBlockMessage(merkleBlock: MerkleBlock) extends DataPayload { - override def commandName = NetworkPayload.merkleBlockCommandName + override val commandName = NetworkPayload.merkleBlockCommandName def bytes: ByteVector = RawMerkleBlockMessageSerializer.write(this) @@ -447,16 +446,12 @@ object NotFoundMessage extends Factory[NotFoundMessage] { /** * The tx message transmits a single transaction in the raw transaction format. * It can be sent in a variety of situations; + * @param transaction The transaction being sent over the wire * @see [[https://bitcoin.org/en/developer-reference#tx]] */ -trait TransactionMessage extends DataPayload { +case class TransactionMessage(transaction: Transaction) extends DataPayload { - /** - * The transaction being sent over the wire - */ - def transaction: Transaction - - override def commandName = NetworkPayload.transactionCommandName + override val commandName = NetworkPayload.transactionCommandName override def bytes: ByteVector = RawTransactionMessageSerializer.write(this) override def toString(): String = s"TransactionMessage(${transaction.txIdBE})" @@ -468,14 +463,8 @@ trait TransactionMessage extends DataPayload { */ object TransactionMessage extends Factory[TransactionMessage] { - private case class TransactionMessageImpl(transaction: Transaction) - extends TransactionMessage - def fromBytes(bytes: ByteVector): TransactionMessage = RawTransactionMessageSerializer.read(bytes) - - def apply(transaction: Transaction): TransactionMessage = - TransactionMessageImpl(transaction) } /** @@ -617,6 +606,12 @@ object FilterAddMessage extends Factory[FilterAddMessage] { element: ByteVector): FilterAddMessage = { FilterAddMessageImpl(elementSize, element) } + + /** Constructs a `FilterAddMessage` from the given hash digest */ + def fromHash(hash: HashDigest): FilterAddMessage = { + FilterAddMessageImpl(CompactSizeUInt(UInt64(hash.bytes.length)), hash.bytes) + } + } /** diff --git a/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala new file mode 100644 index 0000000000..c8fb0b4bc6 --- /dev/null +++ b/node-test/src/test/scala/org/bitcoins/node/UpdateBloomFilterTest.scala @@ -0,0 +1,189 @@ +package org.bitcoins.node + +import org.bitcoins.testkit.wallet.BitcoinSWalletTest +import org.scalatest.FutureOutcome +import org.bitcoins.node.models.Peer +import org.bitcoins.chain.models.BlockHeaderDAO +import org.bitcoins.chain.blockchain.ChainHandler +import org.bitcoins.testkit.node.NodeTestUtil +import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.node.networking.peer.DataMessageHandler +import org.bitcoins.core.protocol.transaction.Transaction +import scala.concurrent._ +import scala.concurrent.duration._ +import org.scalatest.compatible.Assertion +import org.bitcoins.core.currency._ +import scala.util.Try +import akka.actor.Cancellable +import org.scalatest.run +import org.scalatest.exceptions.TestFailedException +import org.bitcoins.core.wallet.fee.SatoshisPerByte + +class UpdateBloomFilterTest extends BitcoinSWalletTest { + override type FixtureParam = WalletWithBitcoind + + def withFixture(test: OneArgAsyncTest): FutureOutcome = + withFundedWalletAndBitcoind(test) + + it must "update the bloom filter with an address" in { param => + val WalletWithBitcoind(wallet, rpc) = param + implicit val chainConf: ChainAppConfig = config + implicit val nodeConf: NodeAppConfig = config + + val assertionP = Promise[Assertion] + val assertionF = assertionP.future + + // we want to schedule a runnable that aborts + // the test after a timeout, but then + // we need to cancel that runnable once + // we get a result + var cancelable: Option[Cancellable] = None + val timeout = 15.seconds + + for { + _ <- config.initialize() + + firstBloom <- wallet.getBloomFilter() + + // this has to be generated after our bloom filter + // is calculated + addressFromWallet <- wallet.getNewAddress() + + spv <- { + val callback = SpvNodeCallbacks.onTxReceived { tx => + rpc.getRawTransaction(tx.txIdBE).foreach { res => + val paysToOurAddress = + // we check if any of the addresses in the TX + // pays to our wallet address + res.vout.exists(_.scriptPubKey.addresses match { + case None => false + case Some(addresses) => addresses.exists(_ == addressFromWallet) + }) + cancelable.forall(_.cancel()) + assertionP.complete { + Try { + assert(paysToOurAddress) + } + } + } + } + + val peer = Peer.fromBitcoind(rpc.instance) + val chain = { + val dao = BlockHeaderDAO() + ChainHandler(dao) + } + val spv = + SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback) + spv.start() + } + _ <- spv.sync() + _ <- NodeTestUtil.awaitSync(spv, rpc) + + _ = spv.updateBloomFilter(addressFromWallet) + _ = { + val runnable = new Runnable { + override def run: Unit = { + assertionP.failure( + new TestFailedException( + s"Did not receive a TX message after $timeout!", + failedCodeStackDepth = 0)) + } + } + cancelable = Some { + actorSystem.scheduler.scheduleOnce(timeout, runnable) + } + } + _ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin) + assertion <- assertionF + } yield assertion + } + + it must "update the bloom filter with a TX" in { param => + val WalletWithBitcoind(wallet, rpc) = param + implicit val chainConf: ChainAppConfig = config + implicit val nodeConf: NodeAppConfig = config + + val assertionP = Promise[Assertion] + val assertionF = assertionP.future + + // we want to schedule a runnable that aborts + // the test after a timeout, but then + // we need to cancel that runnable once + // we get a result + var cancelable: Option[Cancellable] = None + + // the TX we sent from our wallet to bitcoind, + // we expect to get notified once this is + // confirmed + var txFromWallet: Option[Transaction] = None + val timeout = 15.seconds + + for { + _ <- config.initialize() + + firstBloom <- wallet.getBloomFilter() + + spv <- { + val callback = SpvNodeCallbacks.onMerkleBlockReceived { (block, txs) => + val isFromOurWallet = txFromWallet.exists(tx => txs.contains(tx)) + // we might receive more merkle blocks than just the + // one for our TX + if (isFromOurWallet) { + assertionP.success(assert(isFromOurWallet)) + } + } + + val peer = Peer.fromBitcoind(rpc.instance) + val chain = { + val dao = BlockHeaderDAO() + ChainHandler(dao) + } + val spv = + SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback) + spv.start() + } + _ <- spv.sync() + _ <- NodeTestUtil.awaitSync(spv, rpc) + + addressFromBitcoind <- rpc.getNewAddress + tx <- wallet + .sendToAddress(addressFromBitcoind, + 5.bitcoin, + SatoshisPerByte(100.sats)) + .map { tx => + txFromWallet = Some(tx) + tx + } + + _ = { + val _ = spv.broadcastTransaction(tx) + val SpvNode(_, _, newBloom, _) = spv.updateBloomFilter(tx) + assert(newBloom.contains(tx.txId)) + + cancelable = Some { + actorSystem.scheduler.scheduleOnce( + timeout, + new Runnable { + override def run: Unit = { + if (!assertionP.isCompleted) + assertionP.failure( + new TestFailedException( + s"Did not receive a merkle block message after $timeout!", + failedCodeStackDepth = 0)) + } + } + ) + } + } + // this should confirm our TX + // since we updated the bloom filter + // we should get notified about the block + _ <- rpc.getNewAddress.flatMap(rpc.generateToAddress(1, _)) + + assertion <- assertionF + } yield assertion + + } +} diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala new file mode 100644 index 0000000000..87e4064713 --- /dev/null +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/MerkleBuffersTest.scala @@ -0,0 +1,82 @@ +package org.bitcoins.node.networking.peer + +import org.bitcoins.testkit.util.BitcoinSUnitTest +import org.bitcoins.testkit.Implicits._ +import org.bitcoins.core.protocol.blockchain.MerkleBlock +import org.bitcoins.testkit.core.gen.BlockchainElementsGenerator +import org.bitcoins.testkit.core.gen.TransactionGenerators +import org.scalacheck.Gen +import org.bitcoins.core.protocol.transaction.Transaction +import org.bitcoins.core.protocol.blockchain.Block +import _root_.org.scalatest.compatible.Assertion +import scala.concurrent.Future +import scala.util.Success +import scala.util.Try +import scala.util.Failure + +class MerkleBuffersTest extends BitcoinSUnitTest { + behavior of "MerkleBuffers" + + /** Generating blocks and transactions take a little while, + * this is to prevent the test from taking a _really_ long + * time + */ + implicit override val generatorDrivenConfig: PropertyCheckConfiguration = + customGenDrivenConfig(executions = 3) + + it must "match a merkle block with its corresponding transactions" in { + + val txsAndBlockGen: Gen[(Seq[Transaction], Seq[Transaction], Block)] = for { + txs <- Gen.nonEmptyListOf(TransactionGenerators.transaction) + otherTxs <- Gen.nonEmptyListOf(TransactionGenerators.transaction) + block <- BlockchainElementsGenerator.block(txs) + } yield (txs, otherTxs, block) + + forAll(txsAndBlockGen) { + + case (txs, otherTxs, block) => + var receivedExpectedTXs: Option[Try[Assertion]] = None + var callbackCount: Int = 0 + val callback: DataMessageHandler.OnMerkleBlockReceived = { + (_, merkleTxs) => + receivedExpectedTXs = Some( + Try( + assert(txs == merkleTxs, + "Received TXs in callback was not the ones we put in"))) + callbackCount = callbackCount + 1 + } + + val merkle = MerkleBlock(block, txs.map(_.txId)) + val _ = MerkleBuffers.putMerkle(merkle) + + txs.map { tx => + val matches = MerkleBuffers.putTx(tx, Seq(callback)) + assert( + matches, + s"TX ${tx.txIdBE} did not match any merkle block in MerkleBuffers") + } + + otherTxs.map { tx => + val matches = MerkleBuffers.putTx(tx, Seq(callback)) + assert( + !matches, + s"Unrelated TX ${tx.txIdBE} did match merkle block in MerkleBuffers") + + } + + assert(callbackCount != 0, + "Callback was not called after processing all TXs!") + + assert(callbackCount == 1, + s"Callback was called multiple times: $callbackCount") + + receivedExpectedTXs match { + case None => fail("Callback was never called") + case Some(Success(assertion)) => assertion + case Some(Failure(exc)) => fail(exc) + } + + } + + } +} diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index 3816b6b860..9f288ec93f 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -12,7 +12,6 @@ import org.bitcoins.rpc.util.AsyncUtil import scala.concurrent.Future import org.bitcoins.core.bloom.BloomFilter -import org.bitcoins.core.p2p.FilterLoadMessage import org.bitcoins.core.p2p.NetworkPayload import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.node.models.BroadcastAbleTransaction @@ -22,6 +21,7 @@ import scala.util.Failure import scala.util.Success import org.bitcoins.db.P2PLogger import org.bitcoins.node.config.NodeAppConfig +import org.bitcoins.core.protocol.BitcoinAddress case class SpvNode( peer: Peer, @@ -44,6 +44,36 @@ case class SpvNode( PeerMessageSender(client) } + /** Updates our bloom filter to match the given TX + * + * @return SPV node with the updated bloom filter + */ + def updateBloomFilter(transaction: Transaction): SpvNode = { + logger.info(s"Updating bloom filter with transaction=${transaction.txIdBE}") + val newBloom = bloomFilter.update(transaction) + + // we could send filteradd messages, but we would + // then need to calculate all the new elements in + // the filter. this is easier:-) + peerMsgSender.sendFilterClearMessage() + peerMsgSender.sendFilterLoadMessage(newBloom) + + copy(bloomFilter = newBloom) + } + + /** Updates our bloom filter to match the given address + * + * @return SPV node with the updated bloom filter + */ + def updateBloomFilter(address: BitcoinAddress): SpvNode = { + logger.info(s"Updating bloom filter with address=$address") + val hash = address.hash + val newBloom = bloomFilter.insert(hash) + peerMsgSender.sendFilterAddMessage(hash) + + copy(bloomFilter = newBloom) + } + /** * Sends the given P2P to our peer. * This method is useful for playing around @@ -74,8 +104,7 @@ case class SpvNode( } } yield { logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") - val filterMsg = FilterLoadMessage(bloomFilter) - val _ = send(filterMsg) + val _ = peerMsgSender.sendFilterLoadMessage(bloomFilter) node } } diff --git a/node/src/main/scala/org/bitcoins/node/SpvNodeCallbacks.scala b/node/src/main/scala/org/bitcoins/node/SpvNodeCallbacks.scala index ec9ce11647..a76daab97e 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNodeCallbacks.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNodeCallbacks.scala @@ -16,6 +16,18 @@ case class SpvNodeCallbacks( object SpvNodeCallbacks { + /** Constructs a set of callbacks that only acts on TX received */ + def onTxReceived(f: OnTxReceived): SpvNodeCallbacks = + SpvNodeCallbacks(onTxReceived = Seq(f)) + + /** Constructs a set of callbacks that only acts on block received */ + def onBlockReceived(f: OnBlockReceived): SpvNodeCallbacks = + SpvNodeCallbacks(onBlockReceived = Seq(f)) + + /** Constructs a set of callbacks that only acts on merkle block received */ + def onMerkleBlockReceived(f: OnMerkleBlockReceived): SpvNodeCallbacks = + SpvNodeCallbacks(onMerkleBlockReceived = Seq(f)) + /** Empty callbacks that does nothing with the received data */ val empty: SpvNodeCallbacks = SpvNodeCallbacks( diff --git a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala index adc7150c82..3b917e0e55 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/P2PClient.scala @@ -287,10 +287,10 @@ object P2PClient extends P2PLogger { remainingBytes.length) loop(newRemainingBytes, message :: accum) } - case Failure(exception) => + case Failure(exc) => logger.error( - "Failed to parse network message, could be because TCP frame isn't aligned", - exception) + s"Failed to parse network message, could be because TCP frame isn't aligned: $exc") + //this case means that our TCP frame was not aligned with bitcoin protocol //return the unaligned bytes so we can apply them to the next tcp frame of bytes we receive //http://stackoverflow.com/a/37979529/967713 diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 1c807db7b2..30a5163cff 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -19,6 +19,7 @@ import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.core.p2p.TypeIdentifier import org.bitcoins.core.p2p.MsgUnassigned import org.bitcoins.db.P2PLogger +import org.bitcoins.core.p2p.Inventory /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] * that a peer to sent to us on the p2p network, for instance, if we a receive a @@ -29,9 +30,6 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( appConfig: NodeAppConfig) extends P2PLogger { - private val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length - logger.debug(s"Given $callbackNum of callback(s)") - private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile) def handleDataPayload( @@ -68,29 +66,61 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( } FutureUtil.unit - case headersMsg: HeadersMessage => + case HeadersMessage(count, headers) => + logger.debug(s"Received headers message with ${count.toInt} headers") logger.trace( - s"Received headers message with ${headersMsg.count.toInt} headers") - val headers = headersMsg.headers + s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}") val chainApiF = chainHandler.processHeaders(headers) - chainApiF.map { newApi => - val lastHeader = headers.last - val lastHash = lastHeader.hash - newApi.getBlockCount.map { count => - logger.trace( - s"Processed headers, most recent has height=$count and hash=$lastHash.") + logger.trace(s"Requesting data for headers=${headers.length}") + peerMsgSender.sendGetDataMessage(headers: _*) + + chainApiF + .map { newApi => + if (headers.nonEmpty) { + + val lastHeader = headers.last + val lastHash = lastHeader.hash + newApi.getBlockCount.map { count => + logger.trace( + s"Processed headers, most recent has height=$count and hash=$lastHash.") + } + + if (count.toInt == HeadersMessage.MaxHeadersCount) { + logger.error( + s"Received maximum amount of headers in one header message. This means we are not synced, requesting more") + peerMsgSender.sendGetHeadersMessage(lastHash) + } else { + logger.debug( + List(s"Received headers=${count.toInt} in one message,", + "which is less than max. This means we are synced,", + "not requesting more.") + .mkString(" ")) + } + + } + } + .failed + .map { err => + logger.error(s"Error when processing headers message", err) } - peerMsgSender.sendGetHeadersMessage(lastHash) - } case msg: BlockMessage => Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) } - case msg: TransactionMessage => - Future { callbacks.onTxReceived.foreach(_.apply(msg.transaction)) } - case msg: MerkleBlockMessage => - Future { - callbacks.onMerkleBlockReceived.foreach(_.apply(msg.merkleBlock)) + case TransactionMessage(tx) => + val belongsToMerkle = + MerkleBuffers.putTx(tx, callbacks.onMerkleBlockReceived) + if (belongsToMerkle) { + logger.trace( + s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks") + FutureUtil.unit + } else { + logger.trace( + s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks") + Future { callbacks.onTxReceived.foreach(_.apply(tx)) } } + case MerkleBlockMessage(merkleBlock) => + MerkleBuffers.putMerkle(merkleBlock) + FutureUtil.unit case invMsg: InventoryMessage => handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender) } @@ -100,7 +130,11 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)( invMsg: InventoryMessage, peerMsgSender: PeerMessageSender): Future[Unit] = { logger.info(s"Received inv=${invMsg}") - val getData = GetDataMessage(invMsg.inventories) + val getData = GetDataMessage(invMsg.inventories.map { + case Inventory(TypeIdentifier.MsgBlock, hash) => + Inventory(TypeIdentifier.MsgFilteredBlock, hash) + case other: Inventory => other + }) peerMsgSender.sendMsg(getData) FutureUtil.unit @@ -112,8 +146,8 @@ object DataMessageHandler { /** Callback for handling a received block */ type OnBlockReceived = Block => Unit - /** Callback for handling a received Merkle block */ - type OnMerkleBlockReceived = MerkleBlock => Unit + /** Callback for handling a received Merkle block with its corresponding TXs */ + type OnMerkleBlockReceived = (MerkleBlock, Vector[Transaction]) => Unit /** Callback for handling a received transaction */ type OnTxReceived = Transaction => Unit diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala new file mode 100644 index 0000000000..1f8a58b323 --- /dev/null +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/MerkleBuffers.scala @@ -0,0 +1,115 @@ +package org.bitcoins.node.networking.peer + +import org.bitcoins.core.util.BitcoinSLogger +import scala.collection.mutable +import org.bitcoins.core.protocol.blockchain.MerkleBlock +import org.bitcoins.core.protocol.transaction.Transaction + +/** + * A buffer of merkleblocks and the transactions associated with them. + * + * When receiving a merkleblock message over the P2P network, the + * corresponding transactions are sent immediately after. That means + * we have to correlate the received merkleblocks with the matching + * transactions. + * + * This buffer is responsible for calling the approriate callbacks + * once a merkle block has received all its transactions. + */ +private[peer] object MerkleBuffers extends BitcoinSLogger { + private type MerkleBlocksWithTransactions = + mutable.Map[MerkleBlock, mutable.Builder[Transaction, Vector[Transaction]]] + + private val underlyingMap: MerkleBlocksWithTransactions = mutable.Map.empty + + /** Adds the given merkleblock to the buffer */ + def putMerkle(merkle: MerkleBlock): Unit = { + val tree = merkle.partialMerkleTree + val matches = tree.extractMatches + + logger.debug(s"Received merkle block, expecting ${matches.length} TX(s)") + + if (matches.nonEmpty) { + logger.trace(s"Adding merkleBlock=${merkle.blockHeader.hashBE} to buffer") + underlyingMap.put(merkle, + // it's important to use a collection + // type that can call .result() without + // clearing the builder + Vector.newBuilder) + } else { + logger.trace( + s"Merkleblock=${merkle.blockHeader.hashBE} has no matches, not adding to buffer") + } + + () + } + + /** Attempts to add the given transaction to a corresponding + * merkleblock in the buffer. + * + * @param tx The transaction to (maybe) add to the buffer + * @param callbacks The callbacks to execute if we're + * finished processing a merkleblock + * + * @return If the transaction matches a merkle block, returns true. + * Otherwise, false. + */ + def putTx( + tx: Transaction, + callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]): Boolean = { + val blocksInBuffer = underlyingMap.keys.toList + logger.trace(s"Looking for transaction=${tx.txIdBE} in merkleblock buffer") + logger.trace(s"Merkleblocks in buffer: ${blocksInBuffer.length}") + blocksInBuffer.find { block => + val matches = block.partialMerkleTree.extractMatches + + logger.trace( + s"Block=${block.blockHeader.hashBE} has matches=${matches.map(_.flip)}") + + matches.exists(_ == tx.txId) + } match { + case None => + logger.debug( + s"Transaction=${tx.txIdBE} does not belong to any merkle block") + false + case Some(key) => + handleMerkleMatch(tx, merkleBlock = key, callbacks = callbacks) + } + } + + // TODO Scaladoc + private def handleMerkleMatch( + transaction: Transaction, + merkleBlock: MerkleBlock, + callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]) = { + val merkleBlockMatches = merkleBlock.partialMerkleTree.extractMatches + val merkleHash = merkleBlock.blockHeader.hashBE + + val txHash = transaction.txIdBE + + logger.debug(s"Transaction=$txHash matched merkleBlock=$merkleHash") + + logger.trace(s"Adding transaction=$txHash to buffer") + val builder = underlyingMap(merkleBlock) // TODO: error handling + builder += transaction + + val transactionSoFar = builder.result() + val transactionSoFarCount = transactionSoFar.length + val matchesCount = merkleBlockMatches.length + if (transactionSoFarCount == matchesCount) { + logger.debug( + s"We've received all transactions ($transactionSoFarCount) for merkleBlock=$merkleHash") + + logger.trace(s"Removing merkle block from buffer") + underlyingMap.remove(merkleBlock) // TODO: error handling + + logger.trace(s"Calling merkle block callback(s)") + callbacks.foreach(_.apply(merkleBlock, transactionSoFar)) + } else { + logger.trace( + s"We've received $transactionSoFarCount, expecting $matchesCount") + assert(transactionSoFarCount < matchesCount) + } + true + } +} diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala index 3916166d56..80ea4aca72 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageSender.scala @@ -9,6 +9,9 @@ import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.db.P2PLogger +import org.bitcoins.core.crypto.HashDigest +import org.bitcoins.core.bloom.BloomFilter +import org.bitcoins.core.protocol.blockchain.BlockHeader case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig) extends P2PLogger { @@ -67,12 +70,38 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig) sendMsg(message) } + def sendFilterClearMessage(): Unit = { + sendMsg(FilterClearMessage) + } + + def sendFilterAddMessage(hash: HashDigest): Unit = { + val message = FilterAddMessage.fromHash(hash) + logger.trace(s"Sending filteradd=$message to peer=${client.peer}") + sendMsg(message) + } + + def sendFilterLoadMessage(bloom: BloomFilter): Unit = { + val message = FilterLoadMessage(bloom) + logger.trace(s"Sending filterload=$message to peer=${client.peer}") + sendMsg(message) + } + def sendTransactionMessage(transaction: Transaction): Unit = { val message = TransactionMessage(transaction) logger.trace(s"Sending txmessage=$message to peer=${client.peer}") sendMsg(message) } + /** Sends a request for filtered blocks matching the given headers */ + def sendGetDataMessage(headers: BlockHeader*): Unit = { + val inventories = + headers.map(header => + Inventory(TypeIdentifier.MsgFilteredBlock, header.hash)) + val message = GetDataMessage(inventories) + logger.info(s"Sending getdata=$message to peer=${client.peer}") + sendMsg(message) + } + private[node] def sendMsg(msg: NetworkPayload): Unit = { logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}") val newtworkMsg = NetworkMessage(conf.network, msg) diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala index cfd8294ca4..88fdcaaa88 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestUtil.scala @@ -19,7 +19,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor.ActorSystem import org.bitcoins.core.util.BitcoinSLogger -import org.bitcoins.rpc.util.AsyncUtil +import org.bitcoins.testkit.async.TestAsyncUtil import org.bitcoins.core.bloom.BloomFilter import org.bitcoins.core.bloom.BloomUpdateAll @@ -116,12 +116,30 @@ abstract class NodeTestUtil extends BitcoinSLogger { } } - /** Awaits sync between the given SPV node and bitcoind client */ + /** Checks if the given light client and bitcoind + * has the same number of blocks in their blockchains + */ + def isSameBlockCount(spv: SpvNode, rpc: BitcoindRpcClient)( + implicit ec: ExecutionContext): Future[Boolean] = { + val rpcCountF = rpc.getBlockCount + val spvCountF = spv.chainApi.getBlockCount + for { + spvCount <- spvCountF + rpcCount <- rpcCountF + } yield rpcCount == spvCount + } + + /** Awaits sync between the given SPV node and bitcoind client + * + * TODO: We should check for hash, not block height. however, + * our way of determining what the best hash is when having + * multiple tips is not good enough yet + */ def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)( implicit sys: ActorSystem): Future[Unit] = { import sys.dispatcher - AsyncUtil - .retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 500.milliseconds) + TestAsyncUtil + .retryUntilSatisfiedF(() => isSameBlockCount(node, rpc), 500.milliseconds) } } diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala index 08d99cb85d..2292eff1c5 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/WalletBloomTest.scala @@ -15,7 +15,7 @@ class WalletBloomTest extends BitcoinSWalletTest { override type FixtureParam = WalletWithBitcoind override def withFixture(test: OneArgAsyncTest): FutureOutcome = - withNewWalletAndBitcoind(test) + withFundedWalletAndBitcoind(test) it should "generate a bloom filter that matches the pubkeys in our wallet" in { param => @@ -32,19 +32,12 @@ class WalletBloomTest extends BitcoinSWalletTest { } } - // TODO: change fixture to withFundedWalletAndBitcoind once #577 goes in - // https://github.com/bitcoin-s/bitcoin-s/pull/577/files#diff-0fb6ac004fe1e550b7c13258d7d0706cR154 it should "generate a bloom filter that matches the outpoints in our wallet" in { param => val WalletWithBitcoind(walletApi, bitcoind) = param val wallet = walletApi.asInstanceOf[Wallet] for { - address <- wallet.getNewAddress() - tx <- bitcoind - .sendToAddress(address, 5.bitcoins) - .flatMap(bitcoind.getRawTransaction(_)) - _ <- wallet.processTransaction(tx.hex, confirmations = 0) outpoints <- wallet.listOutpoints() bloom <- wallet.getBloomFilter() diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala index 5a422bf803..52339befb3 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/TransactionProcessing.scala @@ -28,7 +28,7 @@ private[wallet] trait TransactionProcessing extends KeyHandlingLogger { logger.info( s"Processing transaction=${transaction.txIdBE} with confirmations=$confirmations") processTransactionImpl(transaction, confirmations).map { - case ProcessTxResult(outgoing, incoming) => + case ProcessTxResult(incoming, outgoing) => logger.info( s"Finished processing of transaction=${transaction.txIdBE}. Relevant incomingTXOs=${incoming.length}, outgoingTXOs=${outgoing.length}") this