Pass bloom filter to SPV node (#514)

* Add getAddressInfo to Wallet API

* Add Bloom filter and event callbacks to SPV node

* Move wallet test trait to testkit

* Test for connecting SPV node with wallet

* Tune logging in SPV node

* Clean up BloomFilter

* Proper toString in inventory and inventorymessage

* Actually pass in callbacks to SPV node

* Fix rebase screwup and partially respond to code review

* Default to file based databases in tests

This is due to issues with deadlocks with
in-memory based databases. We need DBs
to stay alive between connections, but
not across tests.

In DB intensive chain validation tests we
use in-memory databases instead. This
seems like a reasonable tradeoff between
simplicity and speed.

* Make SpvNodeCallbacks contain sequence of functions within

* Make Bloom filter non-optional in SpvNode
This commit is contained in:
Torkel Rogstad 2019-06-17 13:48:26 +02:00 committed by Chris Stewart
parent 54f99ed8fa
commit 90b9b6aa9a
31 changed files with 398 additions and 102 deletions

View file

@ -142,7 +142,8 @@ sealed abstract class BloomFilter extends NetworkElement {
/**
* Checks if the transaction's txid, or any of the constants in it's scriptPubKeys/scriptSigs match our BloomFilter
* See [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
*
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
* for exact details on what is relevant to a bloom filter and what is not relevant
*/
def isRelevant(transaction: Transaction): Boolean = {
@ -150,8 +151,9 @@ sealed abstract class BloomFilter extends NetworkElement {
//pull out all of the constants in the scriptPubKey's
val constantsWithOutputIndex = scriptPubKeys.zipWithIndex.flatMap {
case (scriptPubKey, index) =>
val constants = scriptPubKey.asm.filter(_.isInstanceOf[ScriptConstant])
constants.map(c => (c, index))
scriptPubKey.asm.collect {
case c: ScriptConstant => (c, index)
}
}
//check if the bloom filter contains any of the script constants in our outputs
@ -162,8 +164,9 @@ sealed abstract class BloomFilter extends NetworkElement {
val scriptSigs = transaction.inputs.map(_.scriptSignature)
val constantsWithInputIndex = scriptSigs.zipWithIndex.flatMap {
case (scriptSig, index) =>
val constants = scriptSig.asm.filter(_.isInstanceOf[ScriptConstant])
constants.map(c => (c, index))
scriptSig.asm.collect {
case c: ScriptConstant => (c, index)
}
}
//check if the filter contains any of the prevouts in this tx
val containsOutPoint =
@ -180,7 +183,8 @@ sealed abstract class BloomFilter extends NetworkElement {
/**
* Updates this bloom filter to contain the relevant information for the given Transaction
* See [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
*
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
* for the exact details on what parts of a transaction is added to the bloom filter
*/
def update(transaction: Transaction): BloomFilter = flags match {
@ -190,12 +194,11 @@ sealed abstract class BloomFilter extends NetworkElement {
val outPoints: Seq[TransactionOutPoint] =
scriptPubKeys.zipWithIndex.flatMap {
case (scriptPubKey, index) =>
//constants that matched inside of our current filter
val constants = scriptPubKey.asm.filter(c =>
c.isInstanceOf[ScriptConstant] && contains(c.bytes))
//we need to create a new outpoint in the filter if a constant in the scriptPubKey matched
constants.map(_ =>
TransactionOutPoint(transaction.txId, UInt32(index)))
// we filter all constants, and create an outpoint if the constant matches our filter
scriptPubKey.asm.collect {
case c: ScriptConstant if contains(c.bytes) =>
TransactionOutPoint(transaction.txId, UInt32(index))
}
}
logger.debug("Inserting outPoints: " + outPoints)
@ -221,7 +224,8 @@ sealed abstract class BloomFilter extends NetworkElement {
/**
* Updates a bloom filter according to the rules specified by the
* [[org.bitcoins.core.bloom.BloomUpdateP2PKOnly BloomUpdateP2PKOnly]] flag
* See [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
*
* @see [[https://github.com/bitcoin/bips/blob/master/bip-0037.mediawiki#filter-matching-algorithm BIP37]]
* for the exact rules on updating a bloom filter with this flag set
*/
def updateP2PKOnly(

View file

@ -25,6 +25,8 @@ trait Inventory extends NetworkElement {
def hash: DoubleSha256Digest
override def bytes: ByteVector = RawInventorySerializer.write(this)
override def toString(): String = s"Inventory($typeIdentifier, $hash)"
}
object Inventory extends Factory[Inventory] {
@ -43,6 +45,3 @@ object Inventory extends Factory[Inventory] {
InventoryImpl(typeIdentifier, hash)
}
}

View file

@ -327,6 +327,18 @@ trait InventoryMessage extends DataPayload {
override def commandName = NetworkPayload.invCommandName
override def bytes: ByteVector = RawInventoryMessageSerializer.write(this)
override def toString(): String = {
val invCount = inventoryCount.toInt
val limit = 5
val invList = if (invCount > limit) {
inventories.take(limit).mkString + "..."
} else {
inventories.mkString
}
s"InventoryMessage($invCount inv(s)${if (invList.nonEmpty) ", " + invList
else ""})"
}
}
/**
@ -453,8 +465,11 @@ trait TransactionMessage extends DataPayload {
* The transaction being sent over the wire
*/
def transaction: Transaction
override def commandName = NetworkPayload.transactionCommandName
override def bytes: ByteVector = RawTransactionMessageSerializer.write(this)
override def toString(): String = s"TransactionMessage(${transaction.txIdBE})"
}
/**

View file

@ -34,6 +34,9 @@
<logger name="org.bitcoins.node.config" level="INFO" />
<logger name="org.bitcoins.wallet.config" level="INFO" />
<!-- See incoming message names and the peer it's sent from -->
<logger name="org.bitcoins.node.networking.peer.PeerMessageReceiver" level="INFO" />
<!-- inspect resolved db connection -->
<logger name="org.bitcoins.db.SafeDatabase" level="INFO" />

View file

@ -0,0 +1,143 @@
package org.bitcoins.node
import org.bitcoins.core.currency._
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.scalatest.FutureOutcome
import org.bitcoins.testkit.BitcoinSAppConfig
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import scala.concurrent.Future
import org.bitcoins.node.networking.peer.DataMessageHandler
import scala.concurrent.Promise
import scala.concurrent.duration._
import org.scalatest.compatible.Assertion
import org.scalatest.exceptions.TestFailedException
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.testkit.node.NodeTestUtil
import akka.actor.Cancellable
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.crypto.DoubleSha256DigestBE
class NodeWithWalletTest extends BitcoinSWalletTest {
override type FixtureParam = WalletWithBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome =
withNewWalletAndBitcoind(test)
it must "load a bloom filter and receive information about received payments" in {
param =>
val WalletWithBitcoind(wallet, rpc) = param
/**
* This is not ideal, how do we get one implicit value (`config`)
* to resolve to multiple implicit parameters?
*/
implicit val nodeConfig: NodeAppConfig = config
implicit val chainConfig: ChainAppConfig = config
var expectedTxId: Option[DoubleSha256Digest] = None
var unexpectedTxId: Option[DoubleSha256Digest] = None
var cancellable: Option[Cancellable] = None
val completionP = Promise[Assertion]
val callbacks = {
val onBlock: DataMessageHandler.OnBlockReceived = { block =>
completionP.failure(
new TestFailedException(
s"Received a block! We are only expecting merkle blocks",
failedCodeStackDepth = 0))
}
val onTx: DataMessageHandler.OnTxReceived = { tx =>
if (expectedTxId.contains(tx.txId)) {
cancellable.map(_.cancel())
completionP.success(succeed)
} else if (unexpectedTxId.contains(tx.txId)) {
completionP.failure(
new TestFailedException(
s"Got ${tx.txId}, which is a TX we expect to NOT get notified about!",
failedCodeStackDepth = 0))
} else {
logger.info(s"Didn't match expected TX or unexpected TX")
}
}
SpvNodeCallbacks(
onBlockReceived = Seq(onBlock),
onTxReceived = Seq(onTx)
)
}
def processWalletTx(tx: DoubleSha256DigestBE) = {
expectedTxId = Some(tx.flip)
// how long we're waiting for a tx notify before failing the test
val delay = 15.seconds
val runnable: Runnable = new Runnable {
override def run = {
val msg =
s"Did not receive sent transaction within $delay"
logger.error(msg)
completionP.failure(new TestFailedException(msg, 0))
}
}
cancellable = Some(actorSystem.scheduler.scheduleOnce(delay, runnable))
tx
}
for {
_ <- config.initialize()
address <- wallet.getNewAddress()
bloom <- wallet.getBloomFilter()
spv <- {
val peer = Peer.fromBitcoind(rpc.instance)
val chainHandler = {
val bhDao = BlockHeaderDAO()
ChainHandler(bhDao, config)
}
val spv =
SpvNode(peer,
chainHandler,
bloomFilter = bloom,
callbacks = callbacks)
spv.start()
}
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)
ourTxid <- rpc.sendToAddress(address, 1.bitcoin).map(processWalletTx)
notOurTxid <- rpc.getNewAddress
.flatMap(rpc.sendToAddress(_, 1.bitcoin))
.map { tx =>
// we're generating a TX from bitcoind that should _not_ trigger
// our bloom filter, and not get sent to us. if it gets sent to
// us we fail the test
unexpectedTxId = Some(tx.flip)
tx
}
ourTx <- rpc.getTransaction(ourTxid)
notOurTx <- rpc.getTransaction(notOurTxid)
assertion <- {
assert(bloom.isRelevant(ourTx.hex))
assert(!bloom.isRelevant(notOurTx.hex))
completionP.future
}
} yield assertion
}
}

View file

@ -7,6 +7,7 @@ import org.bitcoins.testkit.node.fixture.SpvNodeConnectedWithBitcoind
import org.scalatest.FutureOutcome
import scala.concurrent.Future
import org.bitcoins.testkit.node.NodeTestUtil
class SpvNodeTest extends NodeUnitTest {
@ -36,16 +37,9 @@ class SpvNodeTest extends NodeUnitTest {
sync <- spvNode.sync()
} yield sync
def isSameBestHash(): Future[Boolean] = {
for {
spvBestHash <- spvNode.chainApi.getBestBlockHash
hash <- hashF
} yield spvBestHash == hash
}
spvSyncF.flatMap { _ =>
RpcUtil
.retryUntilSatisfiedF(isSameBestHash)
NodeTestUtil
.awaitSync(spvNode, bitcoind)
.map(_ => succeed)
}

View file

@ -39,7 +39,7 @@ object Main extends App with BitcoinSLogger {
val peer = Peer(nip)
logger.info(s"Starting spv node")
val spvNodeF = SpvNode(peer, chainApi).start()
val spvNodeF = SpvNode(peer, chainApi, bloomFilter = ???).start()
logger.info(s"Starting SPV node sync")
spvNodeF.map { spvNode =>

View file

@ -15,8 +15,16 @@ import org.bitcoins.node.networking.peer.{
import org.bitcoins.rpc.util.AsyncUtil
import scala.concurrent.Future
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.p2p.FilterLoadMessage
import org.bitcoins.core.p2p.NetworkPayload
case class SpvNode(peer: Peer, chainApi: ChainApi)(
case class SpvNode(
peer: Peer,
chainApi: ChainApi,
bloomFilter: BloomFilter,
callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty
)(
implicit system: ActorSystem,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)
@ -24,7 +32,7 @@ case class SpvNode(peer: Peer, chainApi: ChainApi)(
import system.dispatcher
private val peerMsgRecv =
PeerMessageReceiver.newReceiver
PeerMessageReceiver.newReceiver(callbacks)
private val client: Client =
Client(context = system, peer = peer, peerMessageReceiver = peerMsgRecv)
@ -33,6 +41,16 @@ case class SpvNode(peer: Peer, chainApi: ChainApi)(
PeerMessageSender(client, nodeAppConfig.network)
}
/**
* Sends the given P2P to our peer.
* This method is useful for playing around
* with P2P messages, therefore marked as
* `private[node]`.
*/
private[node] def send(msg: NetworkPayload): Unit = {
peerMsgSender.sendMsg(msg)
}
/** Starts our spv node */
def start(): Future[SpvNode] = {
for {
@ -51,8 +69,12 @@ case class SpvNode(peer: Peer, chainApi: ChainApi)(
this
}
}
} yield node
} yield {
logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer")
val filterMsg = FilterLoadMessage(bloomFilter)
val _ = send(filterMsg)
node
}
}
/** Stops our spv node */

View file

@ -0,0 +1,26 @@
package org.bitcoins.node
import org.bitcoins.node.networking.peer.DataMessageHandler._
/**
* Callbacks for responding to events in the SPV node.
* The approriate callback is executed whenver the node receives
* a `getdata` message matching it.
*
*/
case class SpvNodeCallbacks(
onTxReceived: Seq[OnTxReceived] = Seq.empty,
onBlockReceived: Seq[OnBlockReceived] = Seq.empty,
onMerkleBlockReceived: Seq[OnMerkleBlockReceived] = Seq.empty
)
object SpvNodeCallbacks {
/** Empty callbacks that does nothing with the received data */
val empty: SpvNodeCallbacks =
SpvNodeCallbacks(
onTxReceived = Seq.empty,
onBlockReceived = Seq.empty,
onMerkleBlockReceived = Seq.empty
)
}

View file

@ -4,6 +4,7 @@ import java.net.InetSocketAddress
import org.bitcoins.core.p2p.NetworkIpAddress
import org.bitcoins.db.DbRowAutoInc
import org.bitcoins.rpc.config.BitcoindInstance
case class Peer(networkIpAddress: NetworkIpAddress, id: Option[Long] = None)
extends DbRowAutoInc[Peer] {
@ -15,6 +16,9 @@ case class Peer(networkIpAddress: NetworkIpAddress, id: Option[Long] = None)
this.copy(id = Some(id))
}
override def toString(): String =
s"Peer(${networkIpAddress.address}:${networkIpAddress.port})"
}
object Peer {
@ -27,4 +31,12 @@ object Peer {
val nip = NetworkIpAddress.fromInetSocketAddress(socket = socket)
fromNetworkIpAddress(nip)
}
/**
* Constructs a peer from the given `bitcoind` instance
*/
def fromBitcoind(bitcoind: BitcoindInstance): Peer = {
val socket = new InetSocketAddress(bitcoind.uri.getHost, bitcoind.p2pPort)
fromSocket(socket)
}
}

View file

@ -5,28 +5,36 @@ import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.util.{BitcoinSLogger, FutureUtil}
import org.bitcoins.core.p2p.{
DataPayload,
HeadersMessage,
InventoryMessage
}
import org.bitcoins.core.p2p.{DataPayload, HeadersMessage, InventoryMessage}
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.blockchain.MerkleBlock
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.p2p.BlockMessage
import org.bitcoins.core.p2p.TransactionMessage
import org.bitcoins.core.p2p.MerkleBlockMessage
import org.bitcoins.node.SpvNodeCallbacks
import org.bitcoins.core.p2p.GetDataMessage
/** This actor is meant to handle a [[org.bitcoins.node.messages.DataPayload]]
* that a peer to sent to us on the p2p network, for instance, if we a receive a
* [[HeadersMessage]] we should store those headers in our database
*/
class DataMessageHandler()(
class DataMessageHandler(callbacks: SpvNodeCallbacks)(
implicit ec: ExecutionContext,
appConfig: ChainAppConfig)
extends BitcoinSLogger {
val callbackNum = callbacks.onBlockReceived.length + callbacks.onMerkleBlockReceived.length + callbacks.onTxReceived.length
logger.debug(s"Given $callbackNum of callback(s)")
private val blockHeaderDAO: BlockHeaderDAO = BlockHeaderDAO()
def handleDataPayload(
payload: DataPayload,
peerMsgSender: PeerMessageSender): Future[Unit] = {
payload match {
case headersMsg: HeadersMessage =>
val headers = headersMsg.headers
@ -38,6 +46,14 @@ class DataMessageHandler()(
val lastHash = headers.last.hash
peerMsgSender.sendGetHeadersMessage(lastHash)
}
case msg: BlockMessage =>
Future { callbacks.onBlockReceived.foreach(_.apply(msg.block)) }
case msg: TransactionMessage =>
Future { callbacks.onTxReceived.foreach(_.apply(msg.transaction)) }
case msg: MerkleBlockMessage =>
Future {
callbacks.onMerkleBlockReceived.foreach(_.apply(msg.merkleBlock))
}
case invMsg: InventoryMessage =>
handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender)
}
@ -47,8 +63,25 @@ class DataMessageHandler()(
invMsg: InventoryMessage,
peerMsgSender: PeerMessageSender): Future[Unit] = {
logger.info(s"Received inv=${invMsg}")
val getData = GetDataMessage(invMsg.inventories)
peerMsgSender.sendMsg(getData)
FutureUtil.unit
}
}
object DataMessageHandler {
/** Callback for handling a received block */
type OnBlockReceived = Block => Unit
/** Callback for handling a received Merkle block */
type OnMerkleBlockReceived = MerkleBlock => Unit
/** Callback for handling a received transaction */
type OnTxReceived = Transaction => Unit
/** Does nothing */
def noop[T]: T => Unit = _ => ()
}

View file

@ -8,9 +8,15 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.core.p2p._
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.Client
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{Disconnected, Initializing, Normal, Preconnection}
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Disconnected,
Initializing,
Normal,
Preconnection
}
import scala.util.{Failure, Success, Try}
import org.bitcoins.node.SpvNodeCallbacks
/**
* Responsible for receiving messages from a peer on the
@ -21,13 +27,13 @@ import scala.util.{Failure, Success, Try}
*/
class PeerMessageReceiver(
state: PeerMessageReceiverState,
callbacks: SpvNodeCallbacks
)(
implicit ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig)(implicit ref: ActorRefFactory)
chainAppConfig: ChainAppConfig)
extends BitcoinSLogger {
implicit private val nodeConfig = nodeAppConfig
implicit private val chainConfig = chainAppConfig
import ref.dispatcher
//TODO: Really bad to just modify this internal state
@ -107,13 +113,13 @@ class PeerMessageReceiver(
def handleNetworkMessageReceived(
networkMsgRecv: PeerMessageReceiver.NetworkMessageReceived): Unit = {
//create a way to send a response if we need too
val peerMsgSender =
PeerMessageSender(networkMsgRecv.client, chainAppConfig.network)
val client = networkMsgRecv.client
logger.info(
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${peerOpt
.map(_.socket)} ")
//create a way to send a response if we need too
val peerMsgSender = PeerMessageSender(client, chainAppConfig.network)
logger.debug(
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} ")
networkMsgRecv.msg.payload match {
case controlPayload: ControlPayload =>
handleControlPayload(payload = controlPayload, sender = peerMsgSender)
@ -134,7 +140,7 @@ class PeerMessageReceiver(
private def handleDataPayload(
payload: DataPayload,
sender: PeerMessageSender): Unit = {
val dataMsgHandler = new DataMessageHandler()
val dataMsgHandler = new DataMessageHandler(callbacks)
//else it means we are receiving this data payload from a peer,
//we need to handle it
dataMsgHandler.handleDataPayload(payload, sender)
@ -225,20 +231,20 @@ object PeerMessageReceiver {
case class NetworkMessageReceived(msg: NetworkMessage, client: Client)
extends PeerMessageReceiverMsg
def apply(state: PeerMessageReceiverState)(
def apply(
state: PeerMessageReceiverState,
callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)(
implicit ref: ActorRefFactory,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig
): PeerMessageReceiver = {
new PeerMessageReceiver(state, nodeAppConfig, chainAppConfig)(ref)
new PeerMessageReceiver(state, callbacks)
}
def newReceiver(
def newReceiver(callbacks: SpvNodeCallbacks = SpvNodeCallbacks.empty)(
implicit nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig,
ref: ActorRefFactory): PeerMessageReceiver = {
new PeerMessageReceiver(state = PeerMessageReceiverState.fresh(),
nodeAppConfig,
chainAppConfig)(ref)
new PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), callbacks)
}
}

View file

@ -9,24 +9,17 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.core.p2p._
import org.bitcoins.node.networking.Client
/**
* Created by chris on 6/7/16.
* This actor is the middle man between our [[Client]] and higher level actors such as
* [[org.bitcoins.node.networking.BlockActor]]. When it receives a message, it tells [[Client]] to create connection to a peer,
* then it exchanges [[VersionMessage]], [[VerAckMessage]] and [[org.bitcoins.node.messages.PingMessage]]/[[PongMessage]] message
* with our peer on the network. When the Client finally responds to the [[NetworkMessage]] we originally
* sent it sends that [[NetworkMessage]] back to the actor that requested it.
*/
class PeerMessageSender(client: Client)(implicit np: NetworkParameters)
extends BitcoinSLogger {
private val socket = client.peer.socket
/** Initiates a connection with the given [[Peer]] */
/** Initiates a connection with the given peer */
def connect(): Unit = {
logger.info(s"Attempting to connect to peer=$socket")
(client.actor ! Tcp.Connect(socket))
}
/** Disconnects the given peer */
def disconnect(): Unit = {
logger.info(s"Disconnecting peer at socket=${socket}")
(client.actor ! Tcp.Close)
@ -53,7 +46,7 @@ class PeerMessageSender(client: Client)(implicit np: NetworkParameters)
sendMsg(sendHeadersMsg)
}
private def sendMsg(msg: NetworkPayload): Unit = {
private[node] def sendMsg(msg: NetworkPayload): Unit = {
logger.debug(
s"PeerMessageSender sending to peer=${socket} msg=${msg.commandName}")
val newtworkMsg = NetworkMessage(np, msg)
@ -80,9 +73,6 @@ object PeerMessageSender {
/** Accumulators network messages while we are doing a handshake with our peer
* and caches a peer handler actor so we can send a [[HandshakeFinished]]
* message back to the actor when we are fully connected
*
* @param networkMsgs
* @param peerHandler
*/
case class MessageAccumulator(
networkMsgs: Vector[(ActorRef, NetworkMessage)],

View file

@ -191,6 +191,7 @@ object Deps {
Compile.slf4j,
Compile.scalacheck,
Compile.scalaTest,
Test.akkaTestkit,
Test.ammonite
)

View file

@ -16,7 +16,7 @@ import com.typesafe.config.ConfigFactory
* of this class can be passed in anywhere a wallet,
* chain or node config is required.
*/
case class BitcoinSAppConfig(confs: Config*) {
case class BitcoinSAppConfig(private val confs: Config*) {
val walletConf = WalletAppConfig(confs: _*)
val nodeConf = NodeAppConfig(confs: _*)
val chainConf = ChainAppConfig(confs: _*)
@ -29,6 +29,16 @@ case class BitcoinSAppConfig(confs: Config*) {
Future.sequence(futures).map(_ => ())
}
/** The underlying config the result of our fields derive from */
lazy val config = {
assert(chainConf.config == nodeConf.config)
assert(nodeConf.config == walletConf.config)
// there's nothing special about nodeConf, they should all
// be equal
nodeConf.config
}
}
/**

View file

@ -13,11 +13,17 @@ import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.Client
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.node.SpvNode
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor.ActorSystem
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.bloom.BloomUpdateAll
/**
* Created by chris on 6/2/16.
*/
abstract class NodeTestUtil {
abstract class NodeTestUtil extends BitcoinSLogger {
//txid on testnet 44e504f5b7649d215be05ad9f09026dee95201244a3b218013c504a6a49a26ff
//this tx has multiple inputs and outputs
@ -54,6 +60,9 @@ abstract class NodeTestUtil {
"721101001f693a1ceb6eabcd03f7f7e22a763efec0f4fd1618a50558283b23600000000000e933b17e7f39aa00f24cabbae1e618ecbfbd70a3ecc4f63072dc790000000000e53160d1edccfd7feed401233c274fbc229f7f0d2b6152735344ec0000000000110274616c7161a8aca8390e1a472f22de7470368e5066f20d050000000000008f98d517947a765069f976de05f910a65743fbac59a430aaf30d350000000000572b328dc2155a853157bebcc616fc82fd6996cb348d6a3cd929c500000000007d2c3eca25e06b684132325c85673675b928c3d0ee2eec553c01000000000000a5e239f5b4c6998078e71a065bd71216583e88bf80a93d170253460000000000b6c926f4cb309d2d87f40d4790905f3f29cd05f3ea26854e060700000000000032668e2de62f181cdeabad351318008288993b3db3e411216aad70000000000079f9e80a6ffe62ab13daa65e6410c8d36d513e198fc161b90cd1d40000000000bbf6a01b2faeb102d177ab03131263166790548181ff3cb04308000000000000b90ef011e62cd6c259939ac4ad372c5f395718a93ade933fd5503b0000000000f346a56f2fa278919c40cdde7ea424058543ce2237f6c9df174e2300000000002fd1795f0dbbc70f7a41d369d4b89c56bf1c6cf2c43ef8f8ed00000000000000643a31c93787ab66b51a4ccfc2ce867d855f4ad64b2a3136e1a12d00000000001323867c8b11027eac79e0cc71fce91f24b1066c6423e69ec409000000000000d1eb1916bab3839da423f1e5aa1c271204bd5564bac6fefd498e0f0000000000a4dcf02c42a71b5b10433917dda89a0d34984a065c0b05a52d03000000000000c57477df9128ef4f71366c4a89e432445d94b0c2b02e7a9ccb060000000000004c45281d6afa17835d264cc8ba181b8c51501247c128d644e2000000000000005c6d201f400a544250bae463ff28f47d53f32d97ae27b5b73b5f580000000000b1def34939f027654943457d69e104304c9798c0af837a7e1f1500000000000069164c8213a0d6b38fe1d9a2c63bcfb5808b65f6e50376726a120000000000002893d0fdafe84e3670a31b22ba80edfb841746462417bad024ac5e0000000000063015920d27befb9ff25f9a1989cda07e4ce62fa9ac8ec0f5b401000000000040a936762fbde4b51bea3ad59dfe202f16dd220761235172960c000000000000332d487a5cc80c00296c43c5bec6b6b1a41a499ce2efd6b6d8514b00000000009400a26083d0551175374c45746488d1c9eaea8d891e69f2e57c5712000000005d62facd94114f5ee55ab6e6797a5c6a8d0e0626b9200ffdf647f15c0000000043497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea3309000000000000000000000000000000000000000000000000000000000000000000000000"
def getHeadersMsg = GetHeadersMessage(rawGetHeadersMsg)
val emptyBloomFilter: BloomFilter =
BloomFilter(numElements = 1, falsePositiveRate = 1, flags = BloomUpdateAll)
/** These are the first 5 block headers on testnet, this does NOT include the genesis block header */
lazy val firstFiveTestNetBlockHeaders: List[BlockHeader] = {
List(
@ -95,6 +104,27 @@ abstract class NodeTestUtil {
Peer.fromNetworkIpAddress(networkIpAddress)
}
/** Checks if the given SPV node and bitcoind is synced */
def isSameBestHash(node: SpvNode, rpc: BitcoindRpcClient)(
implicit ec: ExecutionContext): Future[Boolean] = {
val hashF = rpc.getBestBlockHash
val spvHashF = node.chainApi.getBestBlockHash
for {
spvBestHash <- spvHashF
hash <- hashF
} yield {
spvBestHash == hash
}
}
/** Awaits sync between the given SPV node and bitcoind client */
def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)(
implicit sys: ActorSystem): Future[Unit] = {
import sys.dispatcher
AsyncUtil
.retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 500.milliseconds)
}
}
object NodeTestUtil extends NodeTestUtil

View file

@ -9,13 +9,22 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.db.AppConfig
import org.bitcoins.node.SpvNode
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.{PeerHandler, PeerMessageReceiver, PeerMessageSender}
import org.bitcoins.node.networking.peer.{
PeerHandler,
PeerMessageReceiver,
PeerMessageSender
}
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.testkit.chain.ChainUnitTest
import org.bitcoins.testkit.fixtures.BitcoinSFixture
import org.bitcoins.testkit.node.fixture.SpvNodeConnectedWithBitcoind
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FutureOutcome, MustMatchers}
import org.scalatest.{
BeforeAndAfter,
BeforeAndAfterAll,
FutureOutcome,
MustMatchers
}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
@ -60,7 +69,7 @@ trait NodeUnitTest
def buildPeerMessageReceiver(): PeerMessageReceiver = {
val receiver =
PeerMessageReceiver.newReceiver
PeerMessageReceiver.newReceiver()
receiver
}
@ -95,7 +104,10 @@ trait NodeUnitTest
val peer = createPeer(bitcoind)
for {
chainApi <- chainApiF
} yield SpvNode(peer = peer, chainApi = chainApi)
} yield
SpvNode(peer = peer,
chainApi = chainApi,
bloomFilter = NodeTestUtil.emptyBloomFilter)
}
def withSpvNode(test: OneArgAsyncTest)(

View file

@ -1,9 +1,8 @@
package org.bitcoins.wallet.util
package org.bitcoins.testkit.wallet
import akka.actor.ActorSystem
import akka.testkit.TestKit
import org.bitcoins.core.config.RegTest
import org.bitcoins.core.crypto.MnemonicCode
import org.bitcoins.core.protocol.blockchain.ChainParams
import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.rpc.client.common.BitcoindRpcClient
@ -14,7 +13,6 @@ import org.bitcoins.wallet.api.{
InitializeWalletSuccess,
UnlockedWalletApi
}
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.db.{WalletDbManagement}
import org.scalatest._
@ -22,7 +20,6 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import org.bitcoins.db.AppConfig
import org.bitcoins.testkit.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSAppConfig._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

View file

@ -1,4 +1,4 @@
package org.bitcoins.wallet.util
package org.bitcoins.testkit.wallet
import org.bitcoins.core.config.RegTest
import org.bitcoins.core.crypto._
@ -10,7 +10,6 @@ import org.bitcoins.core.protocol.blockchain.{
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.testkit.core.gen.CryptoGenerators
import org.bitcoins.wallet.models.AccountDb
import org.bitcoins.wallet.HDUtil
import scodec.bits.HexStringSyntax
import org.bitcoins.core.hd._
import org.bitcoins.core.protocol.script.ScriptWitness

View file

@ -1,7 +1,7 @@
package org.bitcoins.wallet
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.wallet.api.UnlockWalletError.BadPassword
import org.bitcoins.wallet.api.UnlockWalletError.JsonParsingError

View file

@ -1,7 +1,7 @@
package org.bitcoins.wallet
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.wallet.api.UnlockWalletError.BadPassword
import org.bitcoins.wallet.api.UnlockWalletError.JsonParsingError

View file

@ -5,7 +5,7 @@ import org.bitcoins.core.number.UInt32
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil
import org.bitcoins.wallet.api.{AddUtxoError, AddUtxoSuccess, WalletApi}
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import scala.concurrent.Future

View file

@ -1,6 +1,6 @@
package org.bitcoins.wallet
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.testkit.fixtures.EmptyFixture
import org.bitcoins.testkit.core.gen.CryptoGenerators

View file

@ -1,7 +1,7 @@
package org.bitcoins.wallet
import org.bitcoins.wallet.api.UnlockedWalletApi
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.wallet.api.UnlockWalletError.BadPassword
import org.bitcoins.wallet.api.UnlockWalletError.JsonParsingError

View file

@ -2,20 +2,20 @@ package org.bitcoins.wallet.fixtures
import org.bitcoins.wallet.db.WalletDbManagement
import org.bitcoins.wallet.models.AccountDAO
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest._
import scala.concurrent.Future
import org.bitcoins.wallet.config.WalletAppConfig
trait AccountDAOFixture extends fixture.AsyncFlatSpec with BitcoinSWalletTest {
override final type FixtureParam = AccountDAO
final override type FixtureParam = AccountDAO
// to get around the config in `BitcoinSWalletTest` not resolving
// as an AppConfig
private implicit val walletConfig: WalletAppConfig = config.walletConf
implicit private val walletConfig: WalletAppConfig = config.walletConf
override final def withFixture(test: OneArgAsyncTest): FutureOutcome =
final override def withFixture(test: OneArgAsyncTest): FutureOutcome =
makeDependentFixture(createAccountTable, dropAccountTable)(test)
private def dropAccountTable(accountDAO: AccountDAO): Future[Unit] = {

View file

@ -4,7 +4,7 @@ import scala.concurrent.Future
import org.bitcoins.wallet.db.WalletDbManagement
import org.bitcoins.wallet.models.{AccountDAO, AddressDAO}
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest._
import org.bitcoins.wallet.config.WalletAppConfig
@ -14,14 +14,14 @@ import org.bitcoins.wallet.config.WalletAppConfig
*/
trait AddressDAOFixture extends fixture.AsyncFlatSpec with BitcoinSWalletTest {
override final type FixtureParam = (AccountDAO, AddressDAO)
final override type FixtureParam = (AccountDAO, AddressDAO)
override final def withFixture(test: OneArgAsyncTest): FutureOutcome =
final override def withFixture(test: OneArgAsyncTest): FutureOutcome =
makeDependentFixture(createTables, dropTables)(test)
// to get around the config in `BitcoinSWalletTest` not resolving
// as an AppConfig
private implicit val walletConfig: WalletAppConfig = config.walletConf
implicit private val walletConfig: WalletAppConfig = config.walletConf
private def dropTables(daos: FixtureParam): Future[Unit] = {
val (account, address) = daos

View file

@ -1,7 +1,7 @@
package org.bitcoins.wallet.fixtures
import org.bitcoins.wallet.db.WalletDbManagement
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest._
import slick.jdbc.SQLiteProfile.api._

View file

@ -2,7 +2,7 @@ package org.bitcoins.wallet.fixtures
import org.bitcoins.wallet.db.WalletDbManagement
import org.bitcoins.wallet.models.UTXOSpendingInfoDAO
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest._
import scala.concurrent.Future

View file

@ -2,7 +2,7 @@ package org.bitcoins.wallet.models
import org.bitcoins.testkit.core.gen.CryptoGenerators
import org.bitcoins.wallet.fixtures.AccountDAOFixture
import org.bitcoins.wallet.util.{BitcoinSWalletTest, WalletTestUtil}
import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletTestUtil}
class AccountDAOTest extends BitcoinSWalletTest with AccountDAOFixture {

View file

@ -8,7 +8,7 @@ import org.bitcoins.core.protocol.P2SHAddress
import org.bitcoins.core.script.ScriptType
import org.bitcoins.core.util.CryptoUtil
import org.bitcoins.wallet.fixtures.AddressDAOFixture
import org.bitcoins.wallet.util.{BitcoinSWalletTest, WalletTestUtil}
import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletTestUtil}
import org.bitcoins.core.hd.HDChainType
import org.bitcoins.core.hd.SegWitHDPath
import org.bitcoins.wallet.Wallet

View file

@ -7,8 +7,8 @@ import org.bitcoins.core.protocol.transaction.{
}
import org.bitcoins.wallet.fixtures.UtxoDAOFixture
import org.bitcoins.wallet.Wallet
import org.bitcoins.wallet.util.WalletTestUtil
import org.bitcoins.wallet.util.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.WalletTestUtil
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
class UTXOSpendingInfoDAOTest extends BitcoinSWalletTest with UtxoDAOFixture {
behavior of "UTXOSpendingInfoDAO"