Add functionality for updating SPV node bloom filter (#585)

* Add functionality for updating SPV node bloom filter

* Add SPV node shutdown to exit hook

* Clean up traits/case classes

* Change fixture in WalletBloomTest

* Fix logging bug in TransactionProcessing

* Add MerkleBuffers

In this commit we add MerkleBuffers, which is an object
that lets us aggreagate merkleblocks with their corresponding
transactions before sending them out. This is global, mutable
state (bad!) but it's a working solution for now;

* Use TestAsyncUtil

* Add MerkleBuffers test

* Send getdata if receiving single header

* Change awaitSync to use block count

* Fix UpdateBloomFilterTest

* Add more logging of chain/headers validation

* Send getdata for all blocks

* Nits: Scaladocs, comments toString
This commit is contained in:
Torkel Rogstad 2019-08-02 16:22:20 +02:00 committed by Chris Stewart
parent a76f61f97c
commit 46280c9e59
16 changed files with 643 additions and 128 deletions

View file

@ -43,11 +43,6 @@ object Main extends App {
implicit val system = ActorSystem("bitcoin-s") implicit val system = ActorSystem("bitcoin-s")
import system.dispatcher import system.dispatcher
sys.addShutdownHook {
logger.error(s"Exiting process")
system.terminate().foreach(_ => logger.info(s"Actor system terminated"))
}
/** Log the given message, shut down the actor system and quit. */ /** Log the given message, shut down the actor system and quit. */
def error(message: Any): Nothing = { def error(message: Any): Nothing = {
logger.error(s"FATAL: $message") logger.error(s"FATAL: $message")
@ -128,7 +123,17 @@ object Main extends App {
Seq(walletRoutes, nodeRoutes, chainRoutes)) Seq(walletRoutes, nodeRoutes, chainRoutes))
server.start() server.start()
} }
} yield start } yield {
sys.addShutdownHook {
logger.error(s"Exiting process")
node.stop().foreach(_ => logger.info(s"Stopped SPV node"))
system.terminate().foreach(_ => logger.info(s"Actor system terminated"))
}
start
}
startFut.failed.foreach { err => startFut.failed.foreach { err =>
logger.info(s"Error on server startup!", err) logger.info(s"Error on server startup!", err)

View file

@ -63,6 +63,8 @@ object Blockchain extends ChainVerificationLogger {
def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)( def connectTip(header: BlockHeader, blockHeaderDAO: BlockHeaderDAO)(
implicit ec: ExecutionContext, implicit ec: ExecutionContext,
conf: ChainAppConfig): Future[BlockchainUpdate] = { conf: ChainAppConfig): Future[BlockchainUpdate] = {
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
//get all competing chains we have //get all competing chains we have
val blockchainsF: Future[Vector[Blockchain]] = val blockchainsF: Future[Vector[Blockchain]] =
@ -76,8 +78,8 @@ object Blockchain extends ChainVerificationLogger {
blockchain.find(_.hashBE == header.previousBlockHashBE) blockchain.find(_.hashBE == header.previousBlockHashBE)
prevBlockHeaderOpt match { prevBlockHeaderOpt match {
case None => case None =>
logger.debug( logger.warn(
s"No common ancestor found in the chain to connect to ${header.hashBE}") s"No common ancestor found in the chain to connect header=${header.hashBE.hex}")
val err = TipUpdateResult.BadPreviousBlockHash(header) val err = TipUpdateResult.BadPreviousBlockHash(header)
val failed = BlockchainUpdate.Failed(blockchain = blockchain, val failed = BlockchainUpdate.Failed(blockchain = blockchain,
failedHeader = header, failedHeader = header,
@ -86,8 +88,8 @@ object Blockchain extends ChainVerificationLogger {
case Some(prevBlockHeader) => case Some(prevBlockHeader) =>
//found a header to connect to! //found a header to connect to!
logger.debug( logger.trace(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain") s"Found ancestor=${prevBlockHeader.hashBE.hex} for header=${header.hashBE.hex}")
val tipResultF = val tipResultF =
TipValidation.checkNewTip(newPotentialTip = header, TipValidation.checkNewTip(newPotentialTip = header,
currentTip = prevBlockHeader, currentTip = prevBlockHeader,
@ -96,10 +98,14 @@ object Blockchain extends ChainVerificationLogger {
tipResultF.map { tipResult => tipResultF.map { tipResult =>
tipResult match { tipResult match {
case TipUpdateResult.Success(headerDb) => case TipUpdateResult.Success(headerDb) =>
logger.debug(
s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain")
val newChain = val newChain =
Blockchain.fromHeaders(headerDb +: blockchain.headers) Blockchain.fromHeaders(headerDb +: blockchain.headers)
BlockchainUpdate.Successful(newChain, headerDb) BlockchainUpdate.Successful(newChain, headerDb)
case fail: TipUpdateResult.Failure => case fail: TipUpdateResult.Failure =>
logger.warn(
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
BlockchainUpdate.Failed(blockchain, header, fail) BlockchainUpdate.Failed(blockchain, header, fail)
} }
} }

View file

@ -8,6 +8,11 @@ import org.bitcoins.core.protocol.blockchain.BlockHeader
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.db.ChainVerificationLogger import org.bitcoins.db.ChainVerificationLogger
import org.bitcoins.chain.validation.TipUpdateResult.BadNonce
import org.bitcoins.chain.validation.TipUpdateResult.BadPOW
import org.bitcoins.chain.validation.TipUpdateResult.BadPreviousBlockHash
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.chain.validation.TipUpdateResult
/** /**
* Chain Handler is meant to be the reference implementation * Chain Handler is meant to be the reference implementation
@ -41,6 +46,8 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
override def processHeader(header: BlockHeader)( override def processHeader(header: BlockHeader)(
implicit ec: ExecutionContext): Future[ChainHandler] = { implicit ec: ExecutionContext): Future[ChainHandler] = {
logger.debug(
s"Processing header=${header.hashBE.hex}, previousHash=${header.previousBlockHashBE.hex}")
val blockchainUpdateF = val blockchainUpdateF =
Blockchain.connectTip(header, blockHeaderDAO) Blockchain.connectTip(header, blockHeaderDAO)
@ -52,14 +59,17 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
val createdF = blockHeaderDAO.create(updatedHeader) val createdF = blockHeaderDAO.create(updatedHeader)
createdF.map { header => createdF.map { header =>
logger.debug( logger.debug(
s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE}") s"Connected new header to blockchain, height=${header.height} hash=${header.hashBE.hex}")
ChainHandler(blockHeaderDAO) ChainHandler(blockHeaderDAO)
} }
case BlockchainUpdate.Failed(_, _, reason) => case BlockchainUpdate.Failed(_, _, reason) =>
val errMsg = val errMsg =
s"Failed to add header to chain, header=${header.hashBE.hex} reason=${reason}" s"Failed to add header to chain, header=${header.hashBE.hex} reason=${reason}"
logger.warn(errMsg) logger.warn(errMsg)
Future.failed(new RuntimeException(errMsg)) // potential chain split happening, let's log what's going on
logTipConnectionFailure(reason).flatMap { _ =>
Future.failed(new RuntimeException(errMsg))
}
} }
blockchainUpdateF.failed.foreach { err => blockchainUpdateF.failed.foreach { err =>
@ -71,6 +81,32 @@ case class ChainHandler(blockHeaderDAO: BlockHeaderDAO)(
newHandlerF newHandlerF
} }
/** Logs a tip connection failure by querying local chain state
* and comparing it to the received `TipUpdateResult`
*/
private def logTipConnectionFailure(failure: TipUpdateResult.Failure)(
implicit ec: ExecutionContext): Future[Unit] = {
failure match {
case _ @(_: BadPOW | _: BadNonce) =>
// TODO: Log this in a meaningful way
FutureUtil.unit
case _: BadPreviousBlockHash =>
blockHeaderDAO.chainTips.map { tips =>
if (tips.length > 1) {
logger.warn {
s"We have multiple (${tips.length}) , competing chainTips=${tips
.map(_.hashBE.hex)
.mkString("[", ",", "]")}"
}
} else {
logger.warn(
s"We don't have competing chainTips. Most recent, valid header=${tips.head.hashBE.hex}")
}
}
}
}
/** /**
* @inheritdoc * @inheritdoc
*/ */

View file

@ -18,7 +18,10 @@ object TipUpdateResult {
} }
/** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.previousBlockHashBE previousBlockHashBE]] was incorrect */ /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.previousBlockHashBE previousBlockHashBE]] was incorrect */
case class BadPreviousBlockHash(header: BlockHeader) extends Failure case class BadPreviousBlockHash(header: BlockHeader) extends Failure {
override def toString: String =
s"BadPreviousBlockHash(hash=${header.hashBE}, previous=${header.previousBlockHashBE})"
}
/** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nBits nBits]] was invalid */ /** Means that [[org.bitcoins.core.protocol.blockchain.BlockHeader.nBits nBits]] was invalid */
case class BadPOW(header: BlockHeader) extends Failure case class BadPOW(header: BlockHeader) extends Failure

View file

@ -8,40 +8,21 @@ import scodec.bits.ByteVector
/** /**
* These are used as unique identifiers inside the peer-to-peer network * These are used as unique identifiers inside the peer-to-peer network
*
* @param typeIdentifier The type of object which was hashed
* @param hash SHA256(SHA256()) hash of the object in internal byte order.
*
* @see [[https://bitcoin.org/en/developer-reference#term-inventory]] * @see [[https://bitcoin.org/en/developer-reference#term-inventory]]
*/ */
trait Inventory extends NetworkElement { case class Inventory(typeIdentifier: TypeIdentifier, hash: DoubleSha256Digest)
extends NetworkElement {
/**
* The type of object which was hashed
* @return
*/
def typeIdentifier: TypeIdentifier
/**
* SHA256(SHA256()) hash of the object in internal byte order.
* @return
*/
def hash: DoubleSha256Digest
override def bytes: ByteVector = RawInventorySerializer.write(this) override def bytes: ByteVector = RawInventorySerializer.write(this)
override def toString(): String = s"Inventory($typeIdentifier, $hash)"
} }
object Inventory extends Factory[Inventory] { object Inventory extends Factory[Inventory] {
private case class InventoryImpl(
typeIdentifier: TypeIdentifier,
hash: DoubleSha256Digest)
extends Inventory
override def fromBytes(bytes: ByteVector): Inventory = override def fromBytes(bytes: ByteVector): Inventory =
RawInventorySerializer.read(bytes) RawInventorySerializer.read(bytes)
def apply(
typeIdentifier: TypeIdentifier,
hash: DoubleSha256Digest): Inventory = {
InventoryImpl(typeIdentifier, hash)
}
} }

View file

@ -17,6 +17,7 @@ import org.bitcoins.core.config.NetworkParameters
import java.net.InetSocketAddress import java.net.InetSocketAddress
import org.bitcoins.core.number.UInt32 import org.bitcoins.core.number.UInt32
import org.bitcoins.core.bloom.BloomFlag import org.bitcoins.core.bloom.BloomFlag
import org.bitcoins.core.crypto.HashDigest
/** /**
* Trait that represents a payload for a message on the Bitcoin p2p network * Trait that represents a payload for a message on the Bitcoin p2p network
@ -43,29 +44,21 @@ sealed trait DataPayload extends NetworkPayload
/** /**
* The block message transmits a single serialized block * The block message transmits a single serialized block
* *
* @param block The block being transmitted inside of this message
*
* @see [[https://bitcoin.org/en/developer-reference#block]] * @see [[https://bitcoin.org/en/developer-reference#block]]
*/ */
trait BlockMessage extends DataPayload { case class BlockMessage(block: Block) extends DataPayload {
override val commandName = NetworkPayload.blockCommandName
/**
* The block being transmitted inside of this [[BlockMessage]]
*/
def block: Block
override def commandName = NetworkPayload.blockCommandName
override def bytes: ByteVector = RawBlockMessageSerializer.write(this) override def bytes: ByteVector = RawBlockMessageSerializer.write(this)
} }
object BlockMessage extends Factory[BlockMessage] { object BlockMessage extends Factory[BlockMessage] {
private case class BlockMessageImpl(block: Block) extends BlockMessage
def fromBytes(bytes: ByteVector): BlockMessage = def fromBytes(bytes: ByteVector): BlockMessage =
RawBlockMessageSerializer.read(bytes) RawBlockMessageSerializer.read(bytes)
def apply(block: Block): BlockMessage = BlockMessageImpl(block)
} }
/** /**
@ -147,41 +140,39 @@ object GetBlocksMessage extends Factory[GetBlocksMessage] {
* The getdata message requests one or more data objects from another node. * The getdata message requests one or more data objects from another node.
* The objects are requested by an inventory, * The objects are requested by an inventory,
* which the requesting node typically previously received by way of an inv message. * which the requesting node typically previously received by way of an inv message.
*
* @param inventoryCount The number of inventory enteries
* @param inventories One or more inventory entries up to a maximum of 50,000 entries.
*
* @see [[https://bitcoin.org/en/developer-reference#getdata]] * @see [[https://bitcoin.org/en/developer-reference#getdata]]
*/ */
trait GetDataMessage extends DataPayload { case class GetDataMessage(
inventoryCount: CompactSizeUInt,
/** inventories: Seq[Inventory])
* The number of inventory enteries extends DataPayload {
*/
def inventoryCount: CompactSizeUInt
/**
* One or more inventory entries up to a maximum of 50,000 entries.
*/
def inventories: Seq[Inventory]
override def commandName = NetworkPayload.getDataCommandName override def commandName = NetworkPayload.getDataCommandName
override def bytes: ByteVector = RawGetDataMessageSerializer.write(this) override def bytes: ByteVector = RawGetDataMessageSerializer.write(this)
override def toString(): String = {
val count = s"inventoryCount=${inventoryCount.toInt}"
val invs = s"inventories=${
val base = inventories.toString
val cutoff = 100
if (base.length() > cutoff) base.take(cutoff) + "..."
else base
}"
s"GetDataMessage($count, $invs)"
}
} }
object GetDataMessage extends Factory[GetDataMessage] { object GetDataMessage extends Factory[GetDataMessage] {
private case class GetDataMessageImpl(
inventoryCount: CompactSizeUInt,
inventories: Seq[Inventory])
extends GetDataMessage
override def fromBytes(bytes: ByteVector): GetDataMessage = { override def fromBytes(bytes: ByteVector): GetDataMessage = {
RawGetDataMessageSerializer.read(bytes) RawGetDataMessageSerializer.read(bytes)
} }
def apply(
inventoryCount: CompactSizeUInt,
inventories: Seq[Inventory]): GetDataMessage = {
GetDataMessageImpl(inventoryCount, inventories)
}
def apply(inventories: Seq[Inventory]): GetDataMessage = { def apply(inventories: Seq[Inventory]): GetDataMessage = {
val inventoryCount = CompactSizeUInt(UInt64(inventories.length)) val inventoryCount = CompactSizeUInt(UInt64(inventories.length))
GetDataMessage(inventoryCount, inventories) GetDataMessage(inventoryCount, inventories)
@ -294,9 +285,17 @@ case class HeadersMessage(count: CompactSizeUInt, headers: Vector[BlockHeader])
object HeadersMessage extends Factory[HeadersMessage] { object HeadersMessage extends Factory[HeadersMessage] {
/** The maximum amount of headers sent in one `headers` message
*
* @see [[https://bitcoin.org/en/developer-reference#getheaders bitcoin.org]]
* developer reference
*/
val MaxHeadersCount: Int = 2000
def fromBytes(bytes: ByteVector): HeadersMessage = def fromBytes(bytes: ByteVector): HeadersMessage =
RawHeadersMessageSerializer.read(bytes) RawHeadersMessageSerializer.read(bytes)
/** Constructs a `headers` message from the given headers */
def apply(headers: Vector[BlockHeader]): HeadersMessage = { def apply(headers: Vector[BlockHeader]): HeadersMessage = {
val count = CompactSizeUInt(UInt64(headers.length)) val count = CompactSizeUInt(UInt64(headers.length))
HeadersMessage(count, headers) HeadersMessage(count, headers)
@ -388,7 +387,7 @@ case object MemPoolMessage extends DataPayload {
*/ */
case class MerkleBlockMessage(merkleBlock: MerkleBlock) extends DataPayload { case class MerkleBlockMessage(merkleBlock: MerkleBlock) extends DataPayload {
override def commandName = NetworkPayload.merkleBlockCommandName override val commandName = NetworkPayload.merkleBlockCommandName
def bytes: ByteVector = RawMerkleBlockMessageSerializer.write(this) def bytes: ByteVector = RawMerkleBlockMessageSerializer.write(this)
@ -447,16 +446,12 @@ object NotFoundMessage extends Factory[NotFoundMessage] {
/** /**
* The tx message transmits a single transaction in the raw transaction format. * The tx message transmits a single transaction in the raw transaction format.
* It can be sent in a variety of situations; * It can be sent in a variety of situations;
* @param transaction The transaction being sent over the wire
* @see [[https://bitcoin.org/en/developer-reference#tx]] * @see [[https://bitcoin.org/en/developer-reference#tx]]
*/ */
trait TransactionMessage extends DataPayload { case class TransactionMessage(transaction: Transaction) extends DataPayload {
/** override val commandName = NetworkPayload.transactionCommandName
* The transaction being sent over the wire
*/
def transaction: Transaction
override def commandName = NetworkPayload.transactionCommandName
override def bytes: ByteVector = RawTransactionMessageSerializer.write(this) override def bytes: ByteVector = RawTransactionMessageSerializer.write(this)
override def toString(): String = s"TransactionMessage(${transaction.txIdBE})" override def toString(): String = s"TransactionMessage(${transaction.txIdBE})"
@ -468,14 +463,8 @@ trait TransactionMessage extends DataPayload {
*/ */
object TransactionMessage extends Factory[TransactionMessage] { object TransactionMessage extends Factory[TransactionMessage] {
private case class TransactionMessageImpl(transaction: Transaction)
extends TransactionMessage
def fromBytes(bytes: ByteVector): TransactionMessage = def fromBytes(bytes: ByteVector): TransactionMessage =
RawTransactionMessageSerializer.read(bytes) RawTransactionMessageSerializer.read(bytes)
def apply(transaction: Transaction): TransactionMessage =
TransactionMessageImpl(transaction)
} }
/** /**
@ -617,6 +606,12 @@ object FilterAddMessage extends Factory[FilterAddMessage] {
element: ByteVector): FilterAddMessage = { element: ByteVector): FilterAddMessage = {
FilterAddMessageImpl(elementSize, element) FilterAddMessageImpl(elementSize, element)
} }
/** Constructs a `FilterAddMessage` from the given hash digest */
def fromHash(hash: HashDigest): FilterAddMessage = {
FilterAddMessageImpl(CompactSizeUInt(UInt64(hash.bytes.length)), hash.bytes)
}
} }
/** /**

View file

@ -0,0 +1,189 @@
package org.bitcoins.node
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.node.models.Peer
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.testkit.node.NodeTestUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.DataMessageHandler
import org.bitcoins.core.protocol.transaction.Transaction
import scala.concurrent._
import scala.concurrent.duration._
import org.scalatest.compatible.Assertion
import org.bitcoins.core.currency._
import scala.util.Try
import akka.actor.Cancellable
import org.scalatest.run
import org.scalatest.exceptions.TestFailedException
import org.bitcoins.core.wallet.fee.SatoshisPerByte
class UpdateBloomFilterTest extends BitcoinSWalletTest {
override type FixtureParam = WalletWithBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome =
withFundedWalletAndBitcoind(test)
it must "update the bloom filter with an address" in { param =>
val WalletWithBitcoind(wallet, rpc) = param
implicit val chainConf: ChainAppConfig = config
implicit val nodeConf: NodeAppConfig = config
val assertionP = Promise[Assertion]
val assertionF = assertionP.future
// we want to schedule a runnable that aborts
// the test after a timeout, but then
// we need to cancel that runnable once
// we get a result
var cancelable: Option[Cancellable] = None
val timeout = 15.seconds
for {
_ <- config.initialize()
firstBloom <- wallet.getBloomFilter()
// this has to be generated after our bloom filter
// is calculated
addressFromWallet <- wallet.getNewAddress()
spv <- {
val callback = SpvNodeCallbacks.onTxReceived { tx =>
rpc.getRawTransaction(tx.txIdBE).foreach { res =>
val paysToOurAddress =
// we check if any of the addresses in the TX
// pays to our wallet address
res.vout.exists(_.scriptPubKey.addresses match {
case None => false
case Some(addresses) => addresses.exists(_ == addressFromWallet)
})
cancelable.forall(_.cancel())
assertionP.complete {
Try {
assert(paysToOurAddress)
}
}
}
}
val peer = Peer.fromBitcoind(rpc.instance)
val chain = {
val dao = BlockHeaderDAO()
ChainHandler(dao)
}
val spv =
SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback)
spv.start()
}
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
_ = spv.updateBloomFilter(addressFromWallet)
_ = {
val runnable = new Runnable {
override def run: Unit = {
assertionP.failure(
new TestFailedException(
s"Did not receive a TX message after $timeout!",
failedCodeStackDepth = 0))
}
}
cancelable = Some {
actorSystem.scheduler.scheduleOnce(timeout, runnable)
}
}
_ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin)
assertion <- assertionF
} yield assertion
}
it must "update the bloom filter with a TX" in { param =>
val WalletWithBitcoind(wallet, rpc) = param
implicit val chainConf: ChainAppConfig = config
implicit val nodeConf: NodeAppConfig = config
val assertionP = Promise[Assertion]
val assertionF = assertionP.future
// we want to schedule a runnable that aborts
// the test after a timeout, but then
// we need to cancel that runnable once
// we get a result
var cancelable: Option[Cancellable] = None
// the TX we sent from our wallet to bitcoind,
// we expect to get notified once this is
// confirmed
var txFromWallet: Option[Transaction] = None
val timeout = 15.seconds
for {
_ <- config.initialize()
firstBloom <- wallet.getBloomFilter()
spv <- {
val callback = SpvNodeCallbacks.onMerkleBlockReceived { (block, txs) =>
val isFromOurWallet = txFromWallet.exists(tx => txs.contains(tx))
// we might receive more merkle blocks than just the
// one for our TX
if (isFromOurWallet) {
assertionP.success(assert(isFromOurWallet))
}
}
val peer = Peer.fromBitcoind(rpc.instance)
val chain = {
val dao = BlockHeaderDAO()
ChainHandler(dao)
}
val spv =
SpvNode(peer, chain, bloomFilter = firstBloom, callbacks = callback)
spv.start()
}
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
addressFromBitcoind <- rpc.getNewAddress
tx <- wallet
.sendToAddress(addressFromBitcoind,
5.bitcoin,
SatoshisPerByte(100.sats))
.map { tx =>
txFromWallet = Some(tx)
tx
}
_ = {
val _ = spv.broadcastTransaction(tx)
val SpvNode(_, _, newBloom, _) = spv.updateBloomFilter(tx)
assert(newBloom.contains(tx.txId))
cancelable = Some {
actorSystem.scheduler.scheduleOnce(
timeout,
new Runnable {
override def run: Unit = {
if (!assertionP.isCompleted)
assertionP.failure(
new TestFailedException(
s"Did not receive a merkle block message after $timeout!",
failedCodeStackDepth = 0))
}
}
)
}
}
// this should confirm our TX
// since we updated the bloom filter
// we should get notified about the block
_ <- rpc.getNewAddress.flatMap(rpc.generateToAddress(1, _))
assertion <- assertionF
} yield assertion
}
}

View file

@ -0,0 +1,82 @@
package org.bitcoins.node.networking.peer
import org.bitcoins.testkit.util.BitcoinSUnitTest
import org.bitcoins.testkit.Implicits._
import org.bitcoins.core.protocol.blockchain.MerkleBlock
import org.bitcoins.testkit.core.gen.BlockchainElementsGenerator
import org.bitcoins.testkit.core.gen.TransactionGenerators
import org.scalacheck.Gen
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.protocol.blockchain.Block
import _root_.org.scalatest.compatible.Assertion
import scala.concurrent.Future
import scala.util.Success
import scala.util.Try
import scala.util.Failure
class MerkleBuffersTest extends BitcoinSUnitTest {
behavior of "MerkleBuffers"
/** Generating blocks and transactions take a little while,
* this is to prevent the test from taking a _really_ long
* time
*/
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
customGenDrivenConfig(executions = 3)
it must "match a merkle block with its corresponding transactions" in {
val txsAndBlockGen: Gen[(Seq[Transaction], Seq[Transaction], Block)] = for {
txs <- Gen.nonEmptyListOf(TransactionGenerators.transaction)
otherTxs <- Gen.nonEmptyListOf(TransactionGenerators.transaction)
block <- BlockchainElementsGenerator.block(txs)
} yield (txs, otherTxs, block)
forAll(txsAndBlockGen) {
case (txs, otherTxs, block) =>
var receivedExpectedTXs: Option[Try[Assertion]] = None
var callbackCount: Int = 0
val callback: DataMessageHandler.OnMerkleBlockReceived = {
(_, merkleTxs) =>
receivedExpectedTXs = Some(
Try(
assert(txs == merkleTxs,
"Received TXs in callback was not the ones we put in")))
callbackCount = callbackCount + 1
}
val merkle = MerkleBlock(block, txs.map(_.txId))
val _ = MerkleBuffers.putMerkle(merkle)
txs.map { tx =>
val matches = MerkleBuffers.putTx(tx, Seq(callback))
assert(
matches,
s"TX ${tx.txIdBE} did not match any merkle block in MerkleBuffers")
}
otherTxs.map { tx =>
val matches = MerkleBuffers.putTx(tx, Seq(callback))
assert(
!matches,
s"Unrelated TX ${tx.txIdBE} did match merkle block in MerkleBuffers")
}
assert(callbackCount != 0,
"Callback was not called after processing all TXs!")
assert(callbackCount == 1,
s"Callback was called multiple times: $callbackCount")
receivedExpectedTXs match {
case None => fail("Callback was never called")
case Some(Success(assertion)) => assertion
case Some(Failure(exc)) => fail(exc)
}
}
}
}

View file

@ -12,7 +12,6 @@ import org.bitcoins.rpc.util.AsyncUtil
import scala.concurrent.Future import scala.concurrent.Future
import org.bitcoins.core.bloom.BloomFilter import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.FilterLoadMessage
import org.bitcoins.core.p2p.NetworkPayload import org.bitcoins.core.p2p.NetworkPayload
import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.node.models.BroadcastAbleTransaction import org.bitcoins.node.models.BroadcastAbleTransaction
@ -22,6 +21,7 @@ import scala.util.Failure
import scala.util.Success import scala.util.Success
import org.bitcoins.db.P2PLogger import org.bitcoins.db.P2PLogger
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.protocol.BitcoinAddress
case class SpvNode( case class SpvNode(
peer: Peer, peer: Peer,
@ -44,6 +44,36 @@ case class SpvNode(
PeerMessageSender(client) PeerMessageSender(client)
} }
/** Updates our bloom filter to match the given TX
*
* @return SPV node with the updated bloom filter
*/
def updateBloomFilter(transaction: Transaction): SpvNode = {
logger.info(s"Updating bloom filter with transaction=${transaction.txIdBE}")
val newBloom = bloomFilter.update(transaction)
// we could send filteradd messages, but we would
// then need to calculate all the new elements in
// the filter. this is easier:-)
peerMsgSender.sendFilterClearMessage()
peerMsgSender.sendFilterLoadMessage(newBloom)
copy(bloomFilter = newBloom)
}
/** Updates our bloom filter to match the given address
*
* @return SPV node with the updated bloom filter
*/
def updateBloomFilter(address: BitcoinAddress): SpvNode = {
logger.info(s"Updating bloom filter with address=$address")
val hash = address.hash
val newBloom = bloomFilter.insert(hash)
peerMsgSender.sendFilterAddMessage(hash)
copy(bloomFilter = newBloom)
}
/** /**
* Sends the given P2P to our peer. * Sends the given P2P to our peer.
* This method is useful for playing around * This method is useful for playing around
@ -74,8 +104,7 @@ case class SpvNode(
} }
} yield { } yield {
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
val filterMsg = FilterLoadMessage(bloomFilter) val _ = peerMsgSender.sendFilterLoadMessage(bloomFilter)
val _ = send(filterMsg)
node node
} }
} }

View file

@ -287,10 +287,10 @@ object P2PClient extends P2PLogger {
remainingBytes.length) remainingBytes.length)
loop(newRemainingBytes, message :: accum) loop(newRemainingBytes, message :: accum)
} }
case Failure(exception) => case Failure(exc) =>
logger.error( logger.error(
"Failed to parse network message, could be because TCP frame isn't aligned", s"Failed to parse network message, could be because TCP frame isn't aligned: $exc")
exception)
//this case means that our TCP frame was not aligned with bitcoin protocol //this case means that our TCP frame was not aligned with bitcoin protocol
//return the unaligned bytes so we can apply them to the next tcp frame of bytes we receive //return the unaligned bytes so we can apply them to the next tcp frame of bytes we receive
//http://stackoverflow.com/a/37979529/967713 //http://stackoverflow.com/a/37979529/967713

View file

@ -19,6 +19,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p.TypeIdentifier import org.bitcoins.core.p2p.TypeIdentifier
import org.bitcoins.core.p2p.MsgUnassigned import org.bitcoins.core.p2p.MsgUnassigned
import org.bitcoins.db.P2PLogger import org.bitcoins.db.P2PLogger
import org.bitcoins.core.p2p.Inventory
/** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]] /** This actor is meant to handle a [[org.bitcoins.core.p2p.DataPayload DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a * that a peer to sent to us on the p2p network, for instance, if we a receive a
@ -29,9 +30,6 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
appConfig: NodeAppConfig) appConfig: NodeAppConfig)
extends P2PLogger { extends P2PLogger {
private val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
logger.debug(s"Given $callbackNum of callback(s)")
private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile) private val txDAO = BroadcastAbleTransactionDAO(SQLiteProfile)
def handleDataPayload( def handleDataPayload(
@ -68,29 +66,61 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
} }
FutureUtil.unit FutureUtil.unit
case headersMsg: HeadersMessage => case HeadersMessage(count, headers) =>
logger.debug(s"Received headers message with ${count.toInt} headers")
logger.trace( logger.trace(
s"Received headers message with ${headersMsg.count.toInt} headers") s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}")
val headers = headersMsg.headers
val chainApiF = chainHandler.processHeaders(headers) val chainApiF = chainHandler.processHeaders(headers)
chainApiF.map { newApi => logger.trace(s"Requesting data for headers=${headers.length}")
val lastHeader = headers.last peerMsgSender.sendGetDataMessage(headers: _*)
val lastHash = lastHeader.hash
newApi.getBlockCount.map { count => chainApiF
logger.trace( .map { newApi =>
s"Processed headers, most recent has height=$count and hash=$lastHash.") if (headers.nonEmpty) {
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount.map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
if (count.toInt == HeadersMessage.MaxHeadersCount) {
logger.error(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
peerMsgSender.sendGetHeadersMessage(lastHash)
} else {
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
"not requesting more.")
.mkString(" "))
}
}
}
.failed
.map { err =>
logger.error(s"Error when processing headers message", err)
} }
peerMsgSender.sendGetHeadersMessage(lastHash)
}
case msg: BlockMessage => case msg: BlockMessage =>
Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) } Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) }
case msg: TransactionMessage => case TransactionMessage(tx) =>
Future { callbacks.onTxReceived.foreach(_.apply(msg.transaction)) } val belongsToMerkle =
case msg: MerkleBlockMessage => MerkleBuffers.putTx(tx, callbacks.onMerkleBlockReceived)
Future { if (belongsToMerkle) {
callbacks.onMerkleBlockReceived.foreach(_.apply(msg.merkleBlock)) logger.trace(
s"Transaction=${tx.txIdBE} belongs to merkleblock, not calling callbacks")
FutureUtil.unit
} else {
logger.trace(
s"Transaction=${tx.txIdBE} does not belong to merkleblock, processing given callbacks")
Future { callbacks.onTxReceived.foreach(_.apply(tx)) }
} }
case MerkleBlockMessage(merkleBlock) =>
MerkleBuffers.putMerkle(merkleBlock)
FutureUtil.unit
case invMsg: InventoryMessage => case invMsg: InventoryMessage =>
handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender) handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender)
} }
@ -100,7 +130,11 @@ class DataMessageHandler(callbacks: SpvNodeCallbacks, chainHandler: ChainApi)(
invMsg: InventoryMessage, invMsg: InventoryMessage,
peerMsgSender: PeerMessageSender): Future[Unit] = { peerMsgSender: PeerMessageSender): Future[Unit] = {
logger.info(s"Received inv=${invMsg}") logger.info(s"Received inv=${invMsg}")
val getData = GetDataMessage(invMsg.inventories) val getData = GetDataMessage(invMsg.inventories.map {
case Inventory(TypeIdentifier.MsgBlock, hash) =>
Inventory(TypeIdentifier.MsgFilteredBlock, hash)
case other: Inventory => other
})
peerMsgSender.sendMsg(getData) peerMsgSender.sendMsg(getData)
FutureUtil.unit FutureUtil.unit
@ -112,8 +146,8 @@ object DataMessageHandler {
/** Callback for handling a received block */ /** Callback for handling a received block */
type OnBlockReceived = Block => Unit type OnBlockReceived = Block => Unit
/** Callback for handling a received Merkle block */ /** Callback for handling a received Merkle block with its corresponding TXs */
type OnMerkleBlockReceived = MerkleBlock => Unit type OnMerkleBlockReceived = (MerkleBlock, Vector[Transaction]) => Unit
/** Callback for handling a received transaction */ /** Callback for handling a received transaction */
type OnTxReceived = Transaction => Unit type OnTxReceived = Transaction => Unit

View file

@ -0,0 +1,115 @@
package org.bitcoins.node.networking.peer
import org.bitcoins.core.util.BitcoinSLogger
import scala.collection.mutable
import org.bitcoins.core.protocol.blockchain.MerkleBlock
import org.bitcoins.core.protocol.transaction.Transaction
/**
* A buffer of merkleblocks and the transactions associated with them.
*
* When receiving a merkleblock message over the P2P network, the
* corresponding transactions are sent immediately after. That means
* we have to correlate the received merkleblocks with the matching
* transactions.
*
* This buffer is responsible for calling the approriate callbacks
* once a merkle block has received all its transactions.
*/
private[peer] object MerkleBuffers extends BitcoinSLogger {
private type MerkleBlocksWithTransactions =
mutable.Map[MerkleBlock, mutable.Builder[Transaction, Vector[Transaction]]]
private val underlyingMap: MerkleBlocksWithTransactions = mutable.Map.empty
/** Adds the given merkleblock to the buffer */
def putMerkle(merkle: MerkleBlock): Unit = {
val tree = merkle.partialMerkleTree
val matches = tree.extractMatches
logger.debug(s"Received merkle block, expecting ${matches.length} TX(s)")
if (matches.nonEmpty) {
logger.trace(s"Adding merkleBlock=${merkle.blockHeader.hashBE} to buffer")
underlyingMap.put(merkle,
// it's important to use a collection
// type that can call .result() without
// clearing the builder
Vector.newBuilder)
} else {
logger.trace(
s"Merkleblock=${merkle.blockHeader.hashBE} has no matches, not adding to buffer")
}
()
}
/** Attempts to add the given transaction to a corresponding
* merkleblock in the buffer.
*
* @param tx The transaction to (maybe) add to the buffer
* @param callbacks The callbacks to execute if we're
* finished processing a merkleblock
*
* @return If the transaction matches a merkle block, returns true.
* Otherwise, false.
*/
def putTx(
tx: Transaction,
callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]): Boolean = {
val blocksInBuffer = underlyingMap.keys.toList
logger.trace(s"Looking for transaction=${tx.txIdBE} in merkleblock buffer")
logger.trace(s"Merkleblocks in buffer: ${blocksInBuffer.length}")
blocksInBuffer.find { block =>
val matches = block.partialMerkleTree.extractMatches
logger.trace(
s"Block=${block.blockHeader.hashBE} has matches=${matches.map(_.flip)}")
matches.exists(_ == tx.txId)
} match {
case None =>
logger.debug(
s"Transaction=${tx.txIdBE} does not belong to any merkle block")
false
case Some(key) =>
handleMerkleMatch(tx, merkleBlock = key, callbacks = callbacks)
}
}
// TODO Scaladoc
private def handleMerkleMatch(
transaction: Transaction,
merkleBlock: MerkleBlock,
callbacks: Seq[DataMessageHandler.OnMerkleBlockReceived]) = {
val merkleBlockMatches = merkleBlock.partialMerkleTree.extractMatches
val merkleHash = merkleBlock.blockHeader.hashBE
val txHash = transaction.txIdBE
logger.debug(s"Transaction=$txHash matched merkleBlock=$merkleHash")
logger.trace(s"Adding transaction=$txHash to buffer")
val builder = underlyingMap(merkleBlock) // TODO: error handling
builder += transaction
val transactionSoFar = builder.result()
val transactionSoFarCount = transactionSoFar.length
val matchesCount = merkleBlockMatches.length
if (transactionSoFarCount == matchesCount) {
logger.debug(
s"We've received all transactions ($transactionSoFarCount) for merkleBlock=$merkleHash")
logger.trace(s"Removing merkle block from buffer")
underlyingMap.remove(merkleBlock) // TODO: error handling
logger.trace(s"Calling merkle block callback(s)")
callbacks.foreach(_.apply(merkleBlock, transactionSoFar))
} else {
logger.trace(
s"We've received $transactionSoFarCount, expecting $matchesCount")
assert(transactionSoFarCount < matchesCount)
}
true
}
}

View file

@ -9,6 +9,9 @@ import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.db.P2PLogger import org.bitcoins.db.P2PLogger
import org.bitcoins.core.crypto.HashDigest
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.protocol.blockchain.BlockHeader
case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig) case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
extends P2PLogger { extends P2PLogger {
@ -67,12 +70,38 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(message) sendMsg(message)
} }
def sendFilterClearMessage(): Unit = {
sendMsg(FilterClearMessage)
}
def sendFilterAddMessage(hash: HashDigest): Unit = {
val message = FilterAddMessage.fromHash(hash)
logger.trace(s"Sending filteradd=$message to peer=${client.peer}")
sendMsg(message)
}
def sendFilterLoadMessage(bloom: BloomFilter): Unit = {
val message = FilterLoadMessage(bloom)
logger.trace(s"Sending filterload=$message to peer=${client.peer}")
sendMsg(message)
}
def sendTransactionMessage(transaction: Transaction): Unit = { def sendTransactionMessage(transaction: Transaction): Unit = {
val message = TransactionMessage(transaction) val message = TransactionMessage(transaction)
logger.trace(s"Sending txmessage=$message to peer=${client.peer}") logger.trace(s"Sending txmessage=$message to peer=${client.peer}")
sendMsg(message) sendMsg(message)
} }
/** Sends a request for filtered blocks matching the given headers */
def sendGetDataMessage(headers: BlockHeader*): Unit = {
val inventories =
headers.map(header =>
Inventory(TypeIdentifier.MsgFilteredBlock, header.hash))
val message = GetDataMessage(inventories)
logger.info(s"Sending getdata=$message to peer=${client.peer}")
sendMsg(message)
}
private[node] def sendMsg(msg: NetworkPayload): Unit = { private[node] def sendMsg(msg: NetworkPayload): Unit = {
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}") logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg) val newtworkMsg = NetworkMessage(conf.network, msg)

View file

@ -19,7 +19,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.ActorSystem import akka.actor.ActorSystem
import org.bitcoins.core.util.BitcoinSLogger import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.core.bloom.BloomFilter import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.bloom.BloomUpdateAll import org.bitcoins.core.bloom.BloomUpdateAll
@ -116,12 +116,30 @@ abstract class NodeTestUtil extends BitcoinSLogger {
} }
} }
/** Awaits sync between the given SPV node and bitcoind client */ /** Checks if the given light client and bitcoind
* has the same number of blocks in their blockchains
*/
def isSameBlockCount(spv: SpvNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val rpcCountF = rpc.getBlockCount
val spvCountF = spv.chainApi.getBlockCount
for {
spvCount <- spvCountF
rpcCount <- rpcCountF
} yield rpcCount == spvCount
}
/** Awaits sync between the given SPV node and bitcoind client
*
* TODO: We should check for hash, not block height. however,
* our way of determining what the best hash is when having
* multiple tips is not good enough yet
*/
def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)( def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)(
implicit sys: ActorSystem): Future[Unit] = { implicit sys: ActorSystem): Future[Unit] = {
import sys.dispatcher import sys.dispatcher
AsyncUtil TestAsyncUtil
.retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 500.milliseconds) .retryUntilSatisfiedF(() => isSameBlockCount(node, rpc), 500.milliseconds)
} }
} }

View file

@ -15,7 +15,7 @@ class WalletBloomTest extends BitcoinSWalletTest {
override type FixtureParam = WalletWithBitcoind override type FixtureParam = WalletWithBitcoind
override def withFixture(test: OneArgAsyncTest): FutureOutcome = override def withFixture(test: OneArgAsyncTest): FutureOutcome =
withNewWalletAndBitcoind(test) withFundedWalletAndBitcoind(test)
it should "generate a bloom filter that matches the pubkeys in our wallet" in { it should "generate a bloom filter that matches the pubkeys in our wallet" in {
param => param =>
@ -32,19 +32,12 @@ class WalletBloomTest extends BitcoinSWalletTest {
} }
} }
// TODO: change fixture to withFundedWalletAndBitcoind once #577 goes in
// https://github.com/bitcoin-s/bitcoin-s/pull/577/files#diff-0fb6ac004fe1e550b7c13258d7d0706cR154
it should "generate a bloom filter that matches the outpoints in our wallet" in { it should "generate a bloom filter that matches the outpoints in our wallet" in {
param => param =>
val WalletWithBitcoind(walletApi, bitcoind) = param val WalletWithBitcoind(walletApi, bitcoind) = param
val wallet = walletApi.asInstanceOf[Wallet] val wallet = walletApi.asInstanceOf[Wallet]
for { for {
address <- wallet.getNewAddress()
tx <- bitcoind
.sendToAddress(address, 5.bitcoins)
.flatMap(bitcoind.getRawTransaction(_))
_ <- wallet.processTransaction(tx.hex, confirmations = 0)
outpoints <- wallet.listOutpoints() outpoints <- wallet.listOutpoints()
bloom <- wallet.getBloomFilter() bloom <- wallet.getBloomFilter()

View file

@ -28,7 +28,7 @@ private[wallet] trait TransactionProcessing extends KeyHandlingLogger {
logger.info( logger.info(
s"Processing transaction=${transaction.txIdBE} with confirmations=$confirmations") s"Processing transaction=${transaction.txIdBE} with confirmations=$confirmations")
processTransactionImpl(transaction, confirmations).map { processTransactionImpl(transaction, confirmations).map {
case ProcessTxResult(outgoing, incoming) => case ProcessTxResult(incoming, outgoing) =>
logger.info( logger.info(
s"Finished processing of transaction=${transaction.txIdBE}. Relevant incomingTXOs=${incoming.length}, outgoingTXOs=${outgoing.length}") s"Finished processing of transaction=${transaction.txIdBE}. Relevant incomingTXOs=${incoming.length}, outgoingTXOs=${outgoing.length}")
this this