2019 08 05 broadcast tx test (#680)

* Start refactoring BroadcastTransactionTest to use new SpvNodeFundedWalletBitcoind fixture

* Double timeout to 10.seconds on spv node disconnect

* Attempt to bump disconnect even more

* Add more logging around starting and stoping spv node to try and debug failures with fixtures on test suites

* move starting of spv node into one place in the fixtures -- createSpvNode() --, also look at ordering of destruction to make sure that the spv node is stopped/destroyed BEFORE bitcoind is stopped/destroyed. If the inverse order happens, our spv node won't receive a response from bitcoind and will hang waiting for the Tcp connection to close gracefully

* Try switching from  to  to get CI to pass

* Add more logging and throws to try and debug failures on ci

* Add logging in the case we send a close command and peer isn't defined

* Match on Tcp.Connect on default receive in P2PClient

* Make SpvNode.disconnect() check to see if we are connected before attempting to disconnect

* Add more logging in fixtures, change behavior of SpvNode.disconnect() to not throw if we are already disconnected

* Switch to preferred way of terminating actor system in tests

* Make PeerMessageSender.sendMsg() return a , make sure a message is not sent to P2PClient until we are fully intialized

* Switch p2p logging to DEBUG

* Complete disconnect promise on peer message recv in the case where we didn't have a peer to begin with

* Make SpvNodeTest more robust, fix bug for when we send 'sendheaders' message. We shouldn't do this until our peer has sent us a verack

* Only send 'sendheaders' message to our peer after we receive a 'sendheaders' message from our peer. Hopefully this solves async issues where we were sending 'sendheaders' before our handshake was fully completed with our peer which means we wouldn't get headers sent to us

* Cleanup pt1
This commit is contained in:
Chris Stewart 2019-08-14 07:18:36 -05:00 committed by GitHub
parent 6369c64b6f
commit ff051ac7a2
15 changed files with 159 additions and 110 deletions

View File

@ -255,7 +255,6 @@ trait Client extends BitcoinSLogger {
methodName: String,
params: JsArray): HttpRequest = {
val uuid = UUID.randomUUID().toString
val m: Map[String, JsValue] = Map("method" -> JsString(methodName),
"params" -> params,
"id" -> JsString(uuid))

View File

@ -13,7 +13,7 @@ bitcoin-s {
# The available loggers are:
# incoming and outgoing P2P messages
# p2p = info
#p2p = info
# verification of block headers, merkle trees
# chain-verification = info

View File

@ -1,41 +1,26 @@
package org.bitcoins.node
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.scalatest.FutureOutcome
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.blockchain.ChainHandler
import org.bitcoins.testkit.node.NodeTestUtil
import org.bitcoins.core.currency._
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.rpc.BitcoindException
import org.bitcoins.core.protocol.transaction.Transaction
import org.scalactic.Bool
import org.bitcoins.core.wallet.fee.SatoshisPerByte
import org.bitcoins.rpc.BitcoindException
import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.node.NodeUnitTest.SpvNodeFundedWalletBitcoind
import org.bitcoins.testkit.node.{NodeTestUtil, NodeUnitTest}
import org.scalatest.FutureOutcome
import scala.concurrent.Future
import scala.concurrent.duration._
import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.testkit.wallet.BitcoinSWalletTest.WalletWithBitcoind
class BroadcastTransactionTest extends BitcoinSWalletTest {
class BroadcastTransactionTest extends NodeUnitTest {
override type FixtureParam = WalletWithBitcoind
override type FixtureParam = SpvNodeFundedWalletBitcoind
def withFixture(test: OneArgAsyncTest): FutureOutcome =
withFundedWalletAndBitcoind(test)
withSpvNodeFundedWalletBitcoind(test, SpvNodeCallbacks.empty)
it must "broadcast a transaction" in { param =>
val WalletWithBitcoind(wallet, rpc) = param
/**
* This is not ideal, how do we get one implicit value (`config`)
* to resolve to multiple implicit parameters?
*/
implicit val nodeConfig: NodeAppConfig = config
implicit val chainConfig: ChainAppConfig = config
val SpvNodeFundedWalletBitcoind(spv, wallet, rpc) = param
def hasSeenTx(transaction: Transaction): Future[Boolean] = {
rpc
@ -54,16 +39,9 @@ class BroadcastTransactionTest extends BitcoinSWalletTest {
}
for {
_ <- config.initialize()
address <- rpc.getNewAddress
bloom <- wallet.getBloomFilter()
spv <- {
val peer = Peer.fromBitcoind(rpc.instance)
val spv = SpvNode(peer, bloomFilter = bloom)
spv.start()
}
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)

View File

@ -58,7 +58,7 @@ class NodeWithWalletTest extends NodeUnitTest {
it must "load a bloom filter and receive information about received payments" in {
param =>
val SpvNodeFundedWalletBitcoind(initSpv, wallet, rpc) = param
val SpvNodeFundedWalletBitcoind(spv, wallet, rpc) = param
walletP.success(wallet)
@ -67,7 +67,7 @@ class NodeWithWalletTest extends NodeUnitTest {
def processWalletTx(tx: DoubleSha256DigestBE): DoubleSha256DigestBE = {
expectedTxIdP.success(tx.flip)
// how long we're waiting for a tx notify before failing the test
val delay = 15.seconds
val delay = 25.seconds
val failTest: Runnable = new Runnable {
override def run = {
@ -89,7 +89,6 @@ class NodeWithWalletTest extends NodeUnitTest {
bloom <- wallet.getBloomFilter()
address <- wallet.getNewAddress()
spv <- initSpv.start()
updatedBloom <- spv.updateBloomFilter(address).map(_.bloomFilter)
_ <- spv.sync()
_ <- NodeTestUtil.awaitSync(spv, rpc)

View File

@ -3,7 +3,7 @@ package org.bitcoins.node
import akka.actor.ActorSystem
import org.bitcoins.core.crypto.DoubleSha256DigestBE
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.util.RpcUtil
import org.bitcoins.rpc.util.{AsyncUtil, RpcUtil}
import org.bitcoins.testkit.node.NodeUnitTest
import org.bitcoins.testkit.node.fixture.SpvNodeConnectedWithBitcoind
import org.scalatest.FutureOutcome
@ -64,8 +64,12 @@ class SpvNodeTest extends NodeUnitTest {
//as they happen with the 'sendheaders' message
//both our spv node and our bitcoind node _should_ both be at the genesis block (regtest)
//at this point so no actual syncing is happening
val initSyncF = gen1F.flatMap { _ =>
spvNode.sync()
val initSyncF = gen1F.flatMap { hashes =>
val syncF = spvNode.sync()
for {
_ <- syncF
_ <- NodeTestUtil.awaitBestHash(hashes.head, spvNode)
} yield ()
}
//start generating a block every 10 seconds with bitcoind
@ -82,7 +86,7 @@ class SpvNodeTest extends NodeUnitTest {
val has6BlocksF = RpcUtil.retryUntilSatisfiedF(
conditionF =
() => spvNode.chainApiFromDb().flatMap(_.getBlockCount.map(_ == 6)),
duration = 1.seconds)
duration = 250.millis)
has6BlocksF.map(_ => succeed)
}
@ -107,6 +111,6 @@ class SpvNodeTest extends NodeUnitTest {
}
}
system.scheduler.schedule(interval, interval, genBlock)
system.scheduler.schedule(2.second, interval, genBlock)
}
}

View File

@ -72,7 +72,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
}
it must "update the bloom filter with an address" in { param =>
val SpvNodeFundedWalletBitcoind(initSpv, wallet, rpc) = param
val SpvNodeFundedWalletBitcoind(spv, wallet, rpc) = param
// we want to schedule a runnable that aborts
// the test after a timeout, but then
@ -87,7 +87,6 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
// is calculated
addressFromWallet <- wallet.getNewAddress()
_ = addressFromWalletP.success(addressFromWallet)
spv <- initSpv.start()
_ <- spv.updateBloomFilter(addressFromWallet)
_ <- spv.sync()
_ <- rpc.sendToAddress(addressFromWallet, 1.bitcoin)
@ -113,7 +112,7 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
}
it must "update the bloom filter with a TX" in { param =>
val SpvNodeFundedWalletBitcoind(initSpv, wallet, rpc) = param
val SpvNodeFundedWalletBitcoind(spv, wallet, rpc) = param
// we want to schedule a runnable that aborts
// the test after a timeout, but then
// we need to cancel that runnable once
@ -123,7 +122,6 @@ class UpdateBloomFilterTest extends NodeUnitTest with BeforeAndAfter {
for {
firstBloom <- wallet.getBloomFilter()
spv <- initSpv.start()
addressFromBitcoind <- rpc.getNewAddress
tx <- wallet
.sendToAddress(addressFromBitcoind,

View File

@ -140,7 +140,6 @@ class P2PClientTest
it must "establish a tcp connection with a bitcoin node" in {
bitcoindPeerF.flatMap { remote =>
println(s"Starting test")
connectAndDisconnect(remote)
}
}

View File

@ -120,11 +120,13 @@ case class SpvNode(
* `private[node]`.
*/
private[node] def send(msg: NetworkPayload): Future[Unit] = {
peerMsgSenderF.map(_.sendMsg(msg))
peerMsgSenderF.flatMap(_.sendMsg(msg))
}
/** Starts our spv node */
def start(): Future[SpvNode] = {
logger(nodeAppConfig).info("Starting spv node")
val start = System.currentTimeMillis()
for {
_ <- nodeAppConfig.initialize()
node <- {
@ -140,6 +142,9 @@ case class SpvNode(
isInitializedF.map { _ =>
logger(nodeAppConfig).info(s"Our peer=${peer} has been initialized")
logger(nodeAppConfig).info(
s"Our spv node has been full started. It took=${System
.currentTimeMillis() - start}ms")
this
}
}
@ -154,15 +159,21 @@ case class SpvNode(
/** Stops our spv node */
def stop(): Future[SpvNode] = {
logger(nodeAppConfig).info(s"Stopping spv node")
val disconnectF = peerMsgSenderF.map(_.disconnect())
val disconnectF = for {
p <- peerMsgSenderF
disconnect <- p.disconnect()
} yield disconnect
val start = System.currentTimeMillis()
val isStoppedF = disconnectF.flatMap { _ =>
logger(nodeAppConfig).info(s"Awaiting disconnect")
AsyncUtil.retryUntilSatisfiedF(() => isDisconnected)
//25 seconds to disconnect
AsyncUtil.retryUntilSatisfiedF(() => isDisconnected, 500.millis)
}
isStoppedF.map { _ =>
logger(nodeAppConfig).info(s"Spv node stopped!")
logger(nodeAppConfig).info(
s"Spv node stopped! It took=${System.currentTimeMillis() - start}ms")
this
}
}
@ -181,19 +192,19 @@ case class SpvNode(
}
logger(nodeAppConfig).info(s"Sending out inv for tx=${transaction.txIdBE}")
peerMsgSenderF.map(_.sendInventoryMessage(transaction))
peerMsgSenderF.flatMap(_.sendInventoryMessage(transaction))
}
/** Checks if we have a tcp connection with our peer */
def isConnected: Future[Boolean] = clientF.flatMap(_.isConnected)
def isConnected: Future[Boolean] = peerMsgSenderF.flatMap(_.isConnected)
/** Checks if we are fully initialized with our peer and have executed the handshake
* This means we can now send arbitrary messages to our peer
* @return
*/
def isInitialized: Future[Boolean] = clientF.flatMap(_.isInitialized)
def isInitialized: Future[Boolean] = peerMsgSenderF.flatMap(_.isInitialized)
def isDisconnected: Future[Boolean] = clientF.flatMap(_.isDisconnected)
def isDisconnected: Future[Boolean] = peerMsgSenderF.flatMap(_.isDisconnected)
/** Starts to sync our spv node with our peer
* If our local best block hash is the same as our peers

View File

@ -92,7 +92,6 @@ case class P2PClientActor(
Await.result(handleTcpMessage(message, Some(peer), unalignedBytes),
timeout)
context.become(awaitNetworkRequest(peer, newUnalignedBytes))
case metaMsg: P2PClient.MetaMsg =>
sender ! handleMetaMsg(metaMsg)
}
@ -105,7 +104,7 @@ case class P2PClientActor(
//after receiving Tcp.Connected we switch to the
//'awaitNetworkRequest' context. This is the main
//execution loop for the Client actor
handleCommand(cmd, peer = None)
handleCommand(cmd, peerOpt = None)
case connected: Tcp.Connected =>
Await.result(handleEvent(connected, unalignedBytes = ByteVector.empty),
@ -246,10 +245,18 @@ case class P2PClientActor(
*/
private def handleCommand(
command: Tcp.Command,
peer: Option[ActorRef]): Unit =
peerOpt: Option[ActorRef]): Unit =
command match {
case closeCmd @ (Tcp.ConfirmedClose | Tcp.Close | Tcp.Abort) =>
peer.map(p => p ! closeCmd)
peerOpt match {
case Some(peer) => peer ! closeCmd
case None =>
logger.error(
s"Failing to disconnect node because we do not have peer defined!")
val newPeerMsgHandlerRecvF = currentPeerMsgHandlerRecv.disconnect()
currentPeerMsgHandlerRecv =
Await.result(newPeerMsgHandlerRecvF, timeout)
}
()
case connectCmd: Tcp.Connect =>
manager ! connectCmd

View File

@ -173,10 +173,6 @@ class PeerMessageReceiver(
sender.sendVerackMessage()
//we want peers to just send us headers
//we don't want to have to request them manually
sender.sendHeadersMessage()
val newRecv = toState(newState)
Future.successful(newRecv)
@ -199,7 +195,9 @@ class PeerMessageReceiver(
sender.sendPong(ping)
Future.successful(this)
case SendHeadersMessage =>
//not implemented as of now
//we want peers to just send us headers
//we don't want to have to request them manually
sender.sendHeadersMessage()
Future.successful(this)
case _: AddrMessage =>
Future.successful(this)

View File

@ -38,7 +38,7 @@ sealed abstract class PeerMessageReceiverState {
}
def isDisconnected: Boolean = {
clientDisconnectF.isCompleted
clientDisconnectF.isCompleted && !isConnected
}
def versionMsgP: Promise[VersionMessage]

View File

@ -2,6 +2,7 @@ package org.bitcoins.node.networking.peer
import akka.actor.ActorRef
import akka.io.Tcp
import akka.util.Timeout
import org.bitcoins.core.crypto.DoubleSha256Digest
import org.bitcoins.core.p2p.NetworkMessage
import org.bitcoins.core.p2p._
@ -12,10 +13,15 @@ import org.bitcoins.db.P2PLogger
import org.bitcoins.core.crypto.HashDigest
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.util.FutureUtil
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
extends P2PLogger {
private val socket = client.peer.socket
implicit private val timeout = Timeout(10.seconds)
/** Initiates a connection with the given peer */
def connect(): Unit = {
@ -23,38 +29,62 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
(client.actor ! Tcp.Connect(socket))
}
def isConnected()(implicit ec: ExecutionContext): Future[Boolean] = {
client.isConnected()
}
def isInitialized()(implicit ec: ExecutionContext): Future[Boolean] = {
client.isInitialized()
}
def isDisconnected()(implicit ec: ExecutionContext): Future[Boolean] = {
client.isDisconnected()
}
/** Disconnects the given peer */
def disconnect(): Unit = {
logger.info(s"Disconnecting peer at socket=${socket}")
(client.actor ! Tcp.Close)
def disconnect()(implicit ec: ExecutionContext): Future[Unit] = {
isConnected().flatMap {
case true =>
logger.info(s"Disconnecting peer at socket=${socket}")
(client.actor ! Tcp.Close)
FutureUtil.unit
case false =>
val err =
s"Cannot disconnect client that is not connected to socket=${socket}!"
logger.warn(err)
FutureUtil.unit
}
}
/** Sends a [[org.bitcoins.core.p2p.VersionMessage VersionMessage]] to our peer */
def sendVersionMessage(): Unit = {
def sendVersionMessage()(implicit ec: ExecutionContext): Future[Unit] = {
val versionMsg = VersionMessage(client.peer.socket, conf.network)
logger.trace(s"Sending versionMsg=$versionMsg to peer=${client.peer}")
sendMsg(versionMsg)
}
def sendVerackMessage(): Unit = {
def sendVerackMessage()(implicit ec: ExecutionContext): Future[Unit] = {
val verackMsg = VerAckMessage
sendMsg(verackMsg)
}
/** Responds to a ping message */
def sendPong(ping: PingMessage): Unit = {
def sendPong(ping: PingMessage)(
implicit ec: ExecutionContext): Future[Unit] = {
val pong = PongMessage(ping.nonce)
logger.trace(s"Sending pong=$pong to peer=${client.peer}")
sendMsg(pong)
}
def sendGetHeadersMessage(lastHash: DoubleSha256Digest): Unit = {
def sendGetHeadersMessage(lastHash: DoubleSha256Digest)(
implicit ec: ExecutionContext): Future[Unit] = {
val headersMsg = GetHeadersMessage(lastHash)
logger.trace(s"Sending getheaders=$headersMsg to peer=${client.peer}")
sendMsg(headersMsg)
}
def sendHeadersMessage(): Unit = {
def sendHeadersMessage()(implicit ec: ExecutionContext): Future[Unit] = {
val sendHeadersMsg = SendHeadersMessage
sendMsg(sendHeadersMsg)
}
@ -62,7 +92,8 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
/**
* Sends a inventory message with the given transactions
*/
def sendInventoryMessage(transactions: Transaction*): Unit = {
def sendInventoryMessage(transactions: Transaction*)(
implicit ec: ExecutionContext): Future[Unit] = {
val inventories =
transactions.map(tx => Inventory(TypeIdentifier.MsgTx, tx.txId))
val message = InventoryMessage(inventories)
@ -70,30 +101,34 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(message)
}
def sendFilterClearMessage(): Unit = {
def sendFilterClearMessage()(implicit ec: ExecutionContext): Future[Unit] = {
sendMsg(FilterClearMessage)
}
def sendFilterAddMessage(hash: HashDigest): Unit = {
def sendFilterAddMessage(hash: HashDigest)(
implicit ec: ExecutionContext): Future[Unit] = {
val message = FilterAddMessage.fromHash(hash)
logger.trace(s"Sending filteradd=$message to peer=${client.peer}")
sendMsg(message)
}
def sendFilterLoadMessage(bloom: BloomFilter): Unit = {
def sendFilterLoadMessage(bloom: BloomFilter)(
implicit ec: ExecutionContext): Future[Unit] = {
val message = FilterLoadMessage(bloom)
logger.trace(s"Sending filterload=$message to peer=${client.peer}")
sendMsg(message)
}
def sendTransactionMessage(transaction: Transaction): Unit = {
def sendTransactionMessage(transaction: Transaction)(
implicit ec: ExecutionContext): Future[Unit] = {
val message = TransactionMessage(transaction)
logger.trace(s"Sending txmessage=$message to peer=${client.peer}")
sendMsg(message)
}
/** Sends a request for filtered blocks matching the given headers */
def sendGetDataMessage(headers: BlockHeader*): Unit = {
def sendGetDataMessage(headers: BlockHeader*)(
implicit ec: ExecutionContext): Future[Unit] = {
val inventories =
headers.map(header =>
Inventory(TypeIdentifier.MsgFilteredBlock, header.hash))
@ -102,10 +137,13 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(message)
}
private[node] def sendMsg(msg: NetworkPayload): Unit = {
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
client.actor ! newtworkMsg
private[node] def sendMsg(msg: NetworkPayload)(
implicit ec: ExecutionContext): Future[Unit] = {
isInitialized().map { _ =>
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
client.actor ! newtworkMsg
}
}
}

View File

@ -25,6 +25,11 @@ trait BitcoinSFixture extends fixture.AsyncFlatSpec with BitcoinSLogger {
destroy: T => Future[Any])(test: OneArgAsyncTest): FutureOutcome = {
val fixtureF = build()
fixtureF.failed.foreach { err =>
println(s"Failed to build fixture with err=${err}")
throw err
}
val outcomeF = fixtureF.flatMap { fixture =>
test(fixture.asInstanceOf[FixtureParam]).toFuture
}
@ -43,6 +48,11 @@ trait BitcoinSFixture extends fixture.AsyncFlatSpec with BitcoinSLogger {
val outcomeAfterDestroyF = destroyP.future.flatMap(_ => outcomeF)
outcomeAfterDestroyF.failed.foreach { err =>
println(s"err creating and destroying fixture")
throw err
}
new FutureOutcome(outcomeAfterDestroyF)
}

View File

@ -1,8 +1,7 @@
package org.bitcoins.testkit.node
import java.net.InetSocketAddress
import akka.actor.ActorRefFactory
import java.net.InetSocketAddress
import org.bitcoins.core.p2p.NetworkMessage
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.core.protocol.transaction.Transaction
@ -14,6 +13,7 @@ import org.bitcoins.node.networking.P2PClient
import org.bitcoins.node.networking.peer.PeerMessageReceiver
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.node.SpvNode
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
@ -22,6 +22,7 @@ import org.bitcoins.core.util.BitcoinSLogger
import org.bitcoins.testkit.async.TestAsyncUtil
import org.bitcoins.core.bloom.BloomFilter
import org.bitcoins.core.bloom.BloomUpdateAll
import org.bitcoins.core.crypto.DoubleSha256DigestBE
abstract class NodeTestUtil extends BitcoinSLogger {
@ -128,17 +129,22 @@ abstract class NodeTestUtil extends BitcoinSLogger {
} yield rpcCount == spvCount
}
/** Awaits sync between the given SPV node and bitcoind client
*
* TODO: We should check for hash, not block height. however,
* our way of determining what the best hash is when having
* multiple tips is not good enough yet
*/
/** Awaits sync between the given SPV node and bitcoind client */
def awaitSync(node: SpvNode, rpc: BitcoindRpcClient)(
implicit sys: ActorSystem): Future[Unit] = {
import sys.dispatcher
TestAsyncUtil
.retryUntilSatisfiedF(() => isSameBlockCount(node, rpc), 500.milliseconds)
.retryUntilSatisfiedF(() => isSameBestHash(node, rpc), 500.milliseconds)
}
/** The future doesn't complete until the spv nodes best hash is the given hash */
def awaitBestHash(hash: DoubleSha256DigestBE, spvNode: SpvNode)(
implicit system: ActorSystem): Future[Unit] = {
import system.dispatcher
def spvBestHashF: Future[DoubleSha256DigestBE] = {
spvNode.chainApiFromDb().flatMap(_.getBestBlockHash)
}
TestAsyncUtil.retryUntilSatisfiedF(() => spvBestHashF.map(_ == hash))
}
}

View File

@ -1,11 +1,8 @@
package org.bitcoins.testkit.node
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import org.bitcoins.chain.blockchain.ChainHandler
import akka.testkit.TestKit
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.chain.api.ChainApi
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.util.BitcoinSLogger
@ -36,6 +33,7 @@ import org.scalatest.{
FutureOutcome,
MustMatchers
}
import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
@ -52,7 +50,7 @@ trait NodeUnitTest
}
override def afterAll(): Unit = {
system.terminate()
TestKit.shutdownActorSystem(system, verifySystemShutdown = true)
()
}
@ -108,21 +106,19 @@ trait NodeUnitTest
() =>
val bitcoindF = BitcoinSFixture.createBitcoind()
bitcoindF.flatMap { bitcoind =>
val spvNode = NodeUnitTest
val spvNodeF = NodeUnitTest
.createSpvNode(bitcoind, SpvNodeCallbacks.empty)(
system,
appConfig.chainConf,
appConfig.nodeConf)
val startedSpv = spvNode
.flatMap(_.start())
startedSpv.map(spv => SpvNodeConnectedWithBitcoind(spv, bitcoind))
spvNodeF.map(spv => SpvNodeConnectedWithBitcoind(spv, bitcoind))
}
}
makeDependentFixture(
build = spvWithBitcoindBuilder,
destroy = NodeUnitTest.destorySpvNodeConnectedWithBitcoind(
destroy = NodeUnitTest.destroySpvNodeConnectedWithBitcoind(
_: SpvNodeConnectedWithBitcoind)(system, appConfig)
)(test)
}
@ -195,7 +191,7 @@ object NodeUnitTest extends BitcoinSLogger {
stopF.flatMap(_ => ChainUnitTest.destroyHeaderTable())
}
def destorySpvNodeConnectedWithBitcoind(
def destroySpvNodeConnectedWithBitcoind(
spvNodeConnectedWithBitcoind: SpvNodeConnectedWithBitcoind)(
implicit system: ActorSystem,
appConfig: BitcoinSAppConfig): Future[Unit] = {
@ -203,16 +199,15 @@ object NodeUnitTest extends BitcoinSLogger {
import system.dispatcher
val spvNode = spvNodeConnectedWithBitcoind.spvNode
val bitcoind = spvNodeConnectedWithBitcoind.bitcoind
val spvNodeDestroyF = destroySpvNode(spvNode)
val bitcoindDestroyF = ChainUnitTest.destroyBitcoind(bitcoind)
for {
_ <- spvNodeDestroyF
_ <- bitcoindDestroyF
val resultF = for {
_ <- destroySpvNode(spvNode)
_ <- ChainUnitTest.destroyBitcoind(bitcoind)
} yield {
logger.debug(s"Done with teardown of spv node connected with bitcoind!")
()
}
resultF
}
/** Creates a spv node, a funded bitcoin-s wallet, all of which are connected to bitcoind */
@ -240,6 +235,9 @@ object NodeUnitTest extends BitcoinSLogger {
BitcoinSWalletTest.WalletWithBitcoind(fundedWalletBitcoind.wallet,
fundedWalletBitcoind.bitcoindRpc)
}
//these need to be done in order, as the spv node needs to be
//stopped before the bitcoind node is stopped
val destroyedF = for {
_ <- destroySpvNode(fundedWalletBitcoind.spvNode)
_ <- BitcoinSWalletTest.destroyWalletWithBitcoind(walletWithBitcoind)
@ -271,6 +269,8 @@ object NodeUnitTest extends BitcoinSLogger {
Peer(id = None, socket = socket)
}
/** Creates a spv node peered with the given bitcoind client, this method
* also calls [[org.bitcoins.node.SpvNode.start() start]] to start the node */
def createSpvNode(bitcoind: BitcoindRpcClient, callbacks: SpvNodeCallbacks)(
implicit system: ActorSystem,
chainAppConfig: ChainAppConfig,
@ -278,13 +278,15 @@ object NodeUnitTest extends BitcoinSLogger {
import system.dispatcher
val chainApiF = ChainUnitTest.createChainHandler()
val peer = createPeer(bitcoind)
for {
val spvNodeF = for {
_ <- chainApiF
} yield {
SpvNode(peer = peer,
bloomFilter = NodeTestUtil.emptyBloomFilter,
callbacks = callbacks)
}
spvNodeF.flatMap(_.start())
}
}