diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000000..26d33521af --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index bed89639bc..ab74f264db 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -84,18 +84,20 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit "No peers specified, unable to start node") } - val peerSocket = - NetworkUtil.parseInetSocketAddress(nodeConf.peers.head, - nodeConf.network.port) + val peerSockets = { + nodeConf.peers.map( + NetworkUtil.parseInetSocketAddress(_, nodeConf.network.port) + ) + } - val peer = Peer.fromSocket(peerSocket, nodeConf.socks5ProxyParams) + val peers = peerSockets.map(Peer.fromSocket(_, nodeConf.socks5ProxyParams)) //run chain work migration val chainApiF = runChainWorkCalc( serverArgParser.forceChainWorkRecalc || chainConf.forceRecalcChainWork) //get a node that isn't started - val nodeF = nodeConf.createNode(peer)(chainConf, system) + val nodeF = nodeConf.createNode(peers)(chainConf, system) val feeProvider = getFeeProviderOrElse( MempoolSpaceProvider(HourFeeTarget, walletConf.network)) diff --git a/docs/node/node.md b/docs/node/node.md index 4f7e7d8a03..c45335bb6c 100644 --- a/docs/node/node.md +++ b/docs/node/node.md @@ -113,7 +113,7 @@ val nodeF = for { peer <- peerF } yield { val dataMessageHandler = DataMessageHandler(chainApi) - NeutrinoNode(nodePeer = peer, + NeutrinoNode(nodePeer = Vector(peer), dataMessageHandler = dataMessageHandler, nodeConfig = nodeConfig, chainConfig = chainConfig, diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index 40a21ee0ee..cc77fb621e 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -4,27 +4,24 @@ import akka.actor.Cancellable import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig -import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind -import org.bitcoins.testkit.node.{ - NodeTestUtil, - NodeTestWithCachedBitcoindNewest -} +import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoinds +import org.bitcoins.testkit.node.{NodeTestUtil, NodeTestWithCachedBitcoindPair} import org.scalatest.{FutureOutcome, Outcome} import scala.concurrent.Future -class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest { +class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { /** Wallet config with data directory set to user temp directory */ override protected def getFreshConfig: BitcoinSAppConfig = BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(pgUrl) - override type FixtureParam = NeutrinoNodeConnectedWithBitcoind + override type FixtureParam = NeutrinoNodeConnectedWithBitcoinds override def withFixture(test: OneArgAsyncTest): FutureOutcome = { val outcomeF: Future[Outcome] = for { - bitcoind <- cachedBitcoindWithFundsF - outcome = withNeutrinoNodeConnectedToBitcoind(test, bitcoind)( + bitcoinds <- clientsF + outcome = withNeutrinoNodeConnectedToBitcoinds(test, bitcoinds.toVector)( system, getFreshConfig) f <- outcome.toFuture @@ -34,15 +31,37 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest { behavior of "NeutrinoNode" + it must "be able to connect and then disconnect from all peers" in { + nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds => + //checking all peers are connected + val node = nodeConnectedWithBitcoind.node + val connFs = node.peers.indices.map(node.isConnected) + val connF = Future.sequence(connFs).map(_.forall(_ == true)) + val assertion1F = connF.map(assert(_)) + + //checking all peers can be disconnected + def isAllDisconnectedF: Future[Boolean] = { + val disconnFs = node.peers.indices.map(node.isDisconnected) + val res = Future.sequence(disconnFs).map(_.forall(_ == true)) + res + } + val disconnF = for { + _ <- assertion1F + _ <- node.stop() + f <- isAllDisconnectedF + } yield f + disconnF.map(assert(_)) + } + it must "receive notification that a block occurred on the p2p network for neutrino" in { - nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind => + nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds => val node = nodeConnectedWithBitcoind.node - val bitcoind = nodeConnectedWithBitcoind.bitcoind + val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0) val assert1F = for { - _ <- node.isConnected.map(assert(_)) - a2 <- node.isInitialized.map(assert(_)) + _ <- node.isConnected(0).map(assert(_)) + a2 <- node.isInitialized(0).map(assert(_)) } yield a2 val hashF: Future[DoubleSha256DigestBE] = bitcoind.getNewAddress @@ -62,9 +81,9 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindNewest { } it must "stay in sync with a bitcoind instance for neutrino" in { - nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind => + nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds => val node = nodeConnectedWithBitcoind.node - val bitcoind = nodeConnectedWithBitcoind.bitcoind + val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0) //we need to generate 1 block for bitcoind to consider //itself out of IBD. bitcoind will not sendheaders diff --git a/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala index 185531a1da..a45f8a160b 100644 --- a/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/SpvNodeTest.scala @@ -42,8 +42,8 @@ class SpvNodeTest extends NodeTestWithCachedBitcoindNewest { val bitcoind = spvNodeConnectedWithBitcoind.bitcoind val assert1F = for { - _ <- spvNode.isConnected.map(assert(_)) - a2 <- spvNode.isInitialized.map(assert(_)) + _ <- spvNode.isConnected(0).map(assert(_)) + a2 <- spvNode.isInitialized(0).map(assert(_)) } yield a2 val hashF: Future[DoubleSha256DigestBE] = bitcoind.getNewAddress diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index b7c26f8c00..5c3e5a373b 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -32,7 +32,7 @@ class DataMessageHandlerTest extends NodeUnitTest { param: SpvNodeConnectedWithBitcoindV21 => val SpvNodeConnectedWithBitcoindV21(spv, _) = param - val sender = spv.peerMsgSender + val sender = spv.peerMsgSenders(0) for { chainApi <- spv.chainApiFromDb() dataMessageHandler = DataMessageHandler(chainApi)(spv.executionContext, @@ -66,7 +66,7 @@ class DataMessageHandlerTest extends NodeUnitTest { } } - val sender = spv.peerMsgSender + val sender = spv.peerMsgSenders(0) for { txId <- bitcoind.sendToAddress(junkAddress, 1.bitcoin) tx <- bitcoind.getRawTransactionRaw(txId) @@ -101,7 +101,7 @@ class DataMessageHandlerTest extends NodeUnitTest { () } } - val sender = spv.peerMsgSender + val sender = spv.peerMsgSenders(0) for { hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) @@ -136,7 +136,7 @@ class DataMessageHandlerTest extends NodeUnitTest { } } - val sender = spv.peerMsgSender + val sender = spv.peerMsgSenders(0) for { hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) header <- bitcoind.getBlockHeaderRaw(hash) @@ -169,7 +169,7 @@ class DataMessageHandlerTest extends NodeUnitTest { () } } - val sender = spv.peerMsgSender + val sender = spv.peerMsgSenders(0) for { hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) filter <- bitcoind.getBlockFilter(hash, FilterType.Basic) diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 19ad3cbaaf..ab617aad94 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -18,7 +18,7 @@ import org.bitcoins.node.networking.peer.DataMessageHandler import scala.concurrent.Future case class NeutrinoNode( - nodePeer: Peer, + nodePeer: Vector[Peer], dataMessageHandler: DataMessageHandler, nodeConfig: NodeAppConfig, chainConfig: ChainAppConfig, @@ -34,7 +34,7 @@ case class NeutrinoNode( override def chainAppConfig: ChainAppConfig = chainConfig - override val peer: Peer = nodePeer + override val peers: Vector[Peer] = nodePeer override def updateDataMessageHandler( dataMessageHandler: DataMessageHandler): NeutrinoNode = { @@ -46,7 +46,7 @@ case class NeutrinoNode( node <- super.start() chainApi <- chainApiFromDb() bestHash <- chainApi.getBestBlockHash() - _ <- peerMsgSender.sendGetCompactFilterCheckPointMessage( + _ <- peerMsgSenders(0).sendGetCompactFilterCheckPointMessage( stopHash = bestHash.flip) } yield { node.asInstanceOf[NeutrinoNode] @@ -75,7 +75,7 @@ case class NeutrinoNode( blockchains <- blockchainsF // Get all of our cached headers in case of a reorg cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip) - _ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders) + _ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders) _ <- syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt, bestFilterOpt = bestFilterOpt, bestBlockHeader = header, @@ -128,7 +128,7 @@ case class NeutrinoNode( chainApi: ChainApi, bestFilterOpt: Option[CompactFilterDb]): Future[Unit] = { val sendCompactFilterHeaderMsgF = { - peerMsgSender.sendNextGetCompactFilterHeadersCommand( + peerMsgSenders(0).sendNextGetCompactFilterHeadersCommand( chainApi = chainApi, filterHeaderBatchSize = chainConfig.filterHeaderBatchSize, prevStopHash = bestFilterHeader.blockHashBE) @@ -143,7 +143,7 @@ case class NeutrinoNode( //means we are not syncing filter headers, and our filters are NOT //in sync with our compact filter headers logger.info(s"Starting sync filters in NeutrinoNode.sync()") - peerMsgSender + peerMsgSenders(0) .sendNextGetCompactFilterCommand( chainApi = chainApi, filterBatchSize = chainConfig.filterBatchSize, diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index 179e0ad608..e239d477ba 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -43,7 +43,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { implicit def executionContext: ExecutionContext = system.dispatcher - val peer: Peer + val peers: Vector[Peer] /** The current data message handler. * It should be noted that the dataMessageHandler contains @@ -74,17 +74,20 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that * the [[ChainApi chain api]] is updated inside of the p2p client */ - lazy val client: P2PClient = { - val peerMsgRecv: PeerMessageReceiver = - PeerMessageReceiver.newReceiver(node = this, peer = peer) - val p2p = P2PClient(context = system, - peer = peer, - peerMessageReceiver = peerMsgRecv) + lazy val clients: Vector[P2PClient] = { + val peerMsgRecvs: Vector[PeerMessageReceiver] = + peers.map(x => PeerMessageReceiver.newReceiver(node = this, peer = x)) + val zipped = peers.zip(peerMsgRecvs) + val p2p = zipped.map { case (peer, peerMsgRecv) => + P2PClient(context = system, + peer = peer, + peerMessageReceiver = peerMsgRecv) + } p2p } - lazy val peerMsgSender: PeerMessageSender = { - PeerMessageSender(client) + lazy val peerMsgSenders: Vector[PeerMessageSender] = { + clients.map(PeerMessageSender(_)) } /** Sends the given P2P to our peer. @@ -92,22 +95,23 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * with P2P messages, therefore marked as * `private[node]`. */ - def send(msg: NetworkPayload): Future[Unit] = { - peerMsgSender.sendMsg(msg) + def send(msg: NetworkPayload, idx: Int): Future[Unit] = { + peerMsgSenders(idx).sendMsg(msg) } /** Checks if we have a tcp connection with our peer */ - def isConnected: Future[Boolean] = peerMsgSender.isConnected() + def isConnected(idx: Int): Future[Boolean] = peerMsgSenders(idx).isConnected() /** Checks if we are fully initialized with our peer and have executed the handshake * This means we can now send arbitrary messages to our peer * * @return */ - def isInitialized: Future[Boolean] = peerMsgSender.isInitialized() + def isInitialized(idx: Int): Future[Boolean] = + peerMsgSenders(idx).isInitialized() - def isDisconnected: Future[Boolean] = - peerMsgSender.isDisconnected() + def isDisconnected(idx: Int): Future[Boolean] = + peerMsgSenders(idx).isDisconnected() /** Starts our node */ def start(): Future[Node] = { @@ -122,18 +126,18 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { val chainApiF = startConfsF.flatMap(_ => chainApiFromDb()) val startNodeF = { - peerMsgSender.connect() + peerMsgSenders.foreach(_.connect()) val isInitializedF = for { - _ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized, + _ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized(0), maxTries = 200, interval = 250.millis) } yield () isInitializedF.failed.foreach(err => - logger.error(s"Failed to connect with peer=$peer with err=$err")) + logger.error(s"Failed to connect with peer=${peers(0)} with err=$err")) isInitializedF.map { _ => - logger.info(s"Our peer=$peer has been initialized") + logger.info(s"Our peer=${peers(0)} has been initialized") logger.info(s"Our node has been full started. It took=${System .currentTimeMillis() - start}ms") this @@ -164,16 +168,25 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { /** Stops our node */ def stop(): Future[Node] = { logger.info(s"Stopping node") + + val disconnectFs = peerMsgSenders.map(_.disconnect()) + val disconnectF = for { - disconnect <- peerMsgSender.disconnect() + disconnect <- Future.sequence(disconnectFs) _ <- nodeAppConfig.stop() } yield disconnect + def isAllDisconnectedF: Future[Boolean] = { + val connF = peerMsgSenders.indices.map(peerMsgSenders(_).isDisconnected()) + val res = Future.sequence(connF).map(_.forall(_ == true)) + res + } + val start = System.currentTimeMillis() val isStoppedF = disconnectF.flatMap { _ => logger.info(s"Awaiting disconnect") //25 seconds to disconnect - AsyncUtil.retryUntilSatisfiedF(() => isDisconnected, 500.millis) + AsyncUtil.retryUntilSatisfiedF(() => isAllDisconnectedF, 500.millis) } isStoppedF.failed.foreach { e => @@ -204,7 +217,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { // Get all of our cached headers in case of a reorg cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip) - _ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders) + _ <- peerMsgSenders(0).sendGetHeadersMessage(cachedHeaders) } yield { logger.info( s"Starting sync node, height=${header.height} hash=${header.hashBE}") @@ -231,15 +244,15 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { for { _ <- addToDbF - connected <- isConnected + connected <- isConnected(0) res <- { if (connected) { logger.info(s"Sending out tx message for tx=$txIds") - peerMsgSender.sendInventoryMessage(transactions: _*) + peerMsgSenders(0).sendInventoryMessage(transactions: _*) } else { Future.failed(new RuntimeException( - s"Error broadcasting transaction $txIds, peer is disconnected $peer")) + s"Error broadcasting transaction $txIds, peer is disconnected ${peers(0)}")) } } } yield res @@ -252,8 +265,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { if (blockHashes.isEmpty) { Future.unit } else { - peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, - blockHashes: _*) + peerMsgSenders(0).sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, + blockHashes: _*) } } diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index 2d48d73951..5ff203316e 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -15,7 +15,7 @@ import org.bitcoins.node.networking.peer.DataMessageHandler import scala.concurrent.Future case class SpvNode( - nodePeer: Peer, + nodePeer: Vector[Peer], dataMessageHandler: DataMessageHandler, nodeConfig: NodeAppConfig, chainConfig: ChainAppConfig, @@ -30,7 +30,7 @@ case class SpvNode( override def chainAppConfig: ChainAppConfig = chainConfig - override val peer: Peer = nodePeer + override val peers: Vector[Peer] = nodePeer private val _bloomFilter = new Mutable(BloomFilter.empty) @@ -58,8 +58,8 @@ case class SpvNode( // then need to calculate all the new elements in // the filter. this is easier:-) for { - _ <- peerMsgSender.sendFilterClearMessage() - _ <- peerMsgSender.sendFilterLoadMessage(newBloom) + _ <- peerMsgSenders(0).sendFilterClearMessage() + _ <- peerMsgSenders(0).sendFilterLoadMessage(newBloom) } yield this } @@ -73,7 +73,7 @@ case class SpvNode( val hash = address.hash _bloomFilter.atomicUpdate(hash)(_.insert(_)) - val sentFilterAddF = peerMsgSender.sendFilterAddMessage(hash) + val sentFilterAddF = peerMsgSenders(0).sendFilterAddMessage(hash) sentFilterAddF.map(_ => this) } @@ -81,10 +81,10 @@ case class SpvNode( override def start(): Future[SpvNode] = { for { node <- super.start() - _ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected) - _ <- peerMsgSender.sendFilterLoadMessage(bloomFilter) + _ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected(0)) + _ <- peerMsgSenders(0).sendFilterLoadMessage(bloomFilter) } yield { - logger.info(s"Sending bloomfilter=${bloomFilter.hex} to $peer") + logger.info(s"Sending bloomfilter=${bloomFilter.hex} to ${peers(0)}") node.asInstanceOf[SpvNode] } } diff --git a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala index 3413a148ae..79e5c3604b 100644 --- a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala +++ b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala @@ -115,10 +115,10 @@ case class NodeAppConfig( } /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ - def createNode(peer: Peer)( + def createNode(peers: Vector[Peer])( chainConf: ChainAppConfig, system: ActorSystem): Future[Node] = { - NodeAppConfig.createNode(peer)(this, chainConf, system) + NodeAppConfig.createNode(peers)(this, chainConf, system) } } @@ -134,7 +134,7 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] { NodeAppConfig(datadir, confs: _*) /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ - def createNode(peer: Peer)(implicit + def createNode(peers: Vector[Peer])(implicit nodeConf: NodeAppConfig, chainConf: ChainAppConfig, system: ActorSystem): Future[Node] = { @@ -150,9 +150,9 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] { nodeConf.nodeType match { case NodeType.SpvNode => - dmhF.map(dmh => SpvNode(peer, dmh, nodeConf, chainConf, system)) + dmhF.map(dmh => SpvNode(peers, dmh, nodeConf, chainConf, system)) case NodeType.NeutrinoNode => - dmhF.map(dmh => NeutrinoNode(peer, dmh, nodeConf, chainConf, system)) + dmhF.map(dmh => NeutrinoNode(peers, dmh, nodeConf, chainConf, system)) case NodeType.FullNode => Future.failed(new RuntimeException("Not implemented")) case NodeType.BitcoindBackend => diff --git a/node/src/main/scala/org/bitcoins/node/util/BitcoinSNodeUtil.scala b/node/src/main/scala/org/bitcoins/node/util/BitcoinSNodeUtil.scala index 43775bb0a9..d3c5e68c30 100644 --- a/node/src/main/scala/org/bitcoins/node/util/BitcoinSNodeUtil.scala +++ b/node/src/main/scala/org/bitcoins/node/util/BitcoinSNodeUtil.scala @@ -1,5 +1,7 @@ package org.bitcoins.node.util +import scala.util.Random + object BitcoinSNodeUtil { /** Creates a unique actor name for a actor @@ -7,7 +9,7 @@ object BitcoinSNodeUtil { * @return */ def createActorName(className: String): String = { - s"${className}-${System.currentTimeMillis()}" + s"$className-${System.currentTimeMillis()}-${Random.nextLong()}" } /** Creates a unique actor name for a given class diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestWithCachedBitcoind.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestWithCachedBitcoind.scala index 90486a2454..c4f7de0d36 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestWithCachedBitcoind.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeTestWithCachedBitcoind.scala @@ -8,13 +8,14 @@ import org.bitcoins.rpc.client.v21.BitcoindV21RpcClient import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.node.NodeUnitTest.{createPeer, syncNeutrinoNode} import org.bitcoins.testkit.node.fixture.{ - NeutrinoNodeConnectedWithBitcoind, + NeutrinoNodeConnectedWithBitcoinds, SpvNodeConnectedWithBitcoind, SpvNodeConnectedWithBitcoindV21 } import org.bitcoins.testkit.rpc.{ CachedBitcoind, CachedBitcoindNewest, + CachedBitcoindPairV21, CachedBitcoindV19 } import org.bitcoins.testkit.wallet.BitcoinSWalletTest @@ -75,25 +76,26 @@ trait NodeTestWithCachedBitcoind extends BaseNodeTest { _: CachedBitcoind[_] => })(test) } - def withNeutrinoNodeConnectedToBitcoind( + def withNeutrinoNodeConnectedToBitcoinds( test: OneArgAsyncTest, - bitcoind: BitcoindRpcClient)(implicit + bitcoinds: Vector[BitcoindRpcClient])(implicit system: ActorSystem, appConfig: BitcoinSAppConfig): FutureOutcome = { val nodeWithBitcoindBuilder: () => Future[ - NeutrinoNodeConnectedWithBitcoind] = { () => + NeutrinoNodeConnectedWithBitcoinds] = { () => require(appConfig.nodeType == NodeType.NeutrinoNode) for { - node <- NodeUnitTest.createNeutrinoNode(bitcoind)(system, - appConfig.chainConf, - appConfig.nodeConf) + node <- NodeUnitTest.createNeutrinoNode(bitcoinds)(system, + appConfig.chainConf, + appConfig.nodeConf) startedNode <- node.start() - syncedNode <- syncNeutrinoNode(startedNode, bitcoind) - } yield NeutrinoNodeConnectedWithBitcoind(syncedNode, bitcoind) + //is it enough to just sync with one bitcoind client for a test? + syncedNode <- syncNeutrinoNode(startedNode, bitcoinds(0)) + } yield NeutrinoNodeConnectedWithBitcoinds(syncedNode, bitcoinds) } - makeDependentFixture[NeutrinoNodeConnectedWithBitcoind]( + makeDependentFixture[NeutrinoNodeConnectedWithBitcoinds]( build = nodeWithBitcoindBuilder, - { case x: NeutrinoNodeConnectedWithBitcoind => + { case x: NeutrinoNodeConnectedWithBitcoinds => tearDownNode(x.node) })(test) } @@ -159,6 +161,16 @@ trait NodeTestWithCachedBitcoindNewest } } +trait NodeTestWithCachedBitcoindPair + extends NodeTestWithCachedBitcoind + with CachedBitcoindPairV21 { + + override def afterAll(): Unit = { + super[CachedBitcoindPairV21].afterAll() + super[NodeTestWithCachedBitcoind].afterAll() + } +} + trait NodeTestWithCachedBitcoindV19 extends NodeTestWithCachedBitcoind with CachedBitcoindV19 { diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index fb97fa6124..c4b9859dc5 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -210,7 +210,7 @@ object NodeUnitTest extends P2PLogger { val dmh = DataMessageHandler(chainApi) - NeutrinoNode(peer, dmh, nodeConf, chainConf, system) + NeutrinoNode(Vector(peer), dmh, nodeConf, chainConf, system) } def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit @@ -453,7 +453,7 @@ object NodeUnitTest extends P2PLogger { } yield { val dmh = DataMessageHandler(chainHandler) SpvNode( - nodePeer = peer, + nodePeer = Vector(peer), dataMessageHandler = dmh, nodeConfig = nodeAppConfig, chainConfig = chainAppConfig, @@ -483,7 +483,39 @@ object NodeUnitTest extends P2PLogger { chainApi <- chainApiF } yield { val dmh = DataMessageHandler(chainApi) - NeutrinoNode(nodePeer = peer, + NeutrinoNode(nodePeer = Vector(peer), + dataMessageHandler = dmh, + nodeConfig = nodeAppConfig, + chainConfig = chainAppConfig, + actorSystem = system) + } + + nodeF + } + + /** Creates a Neutrino node peered with the given bitcoind client, this method + * also calls [[org.bitcoins.node.Node.start() start]] to start the node + */ + def createNeutrinoNode(bitcoinds: Vector[BitcoindRpcClient])(implicit + system: ActorSystem, + chainAppConfig: ChainAppConfig, + nodeAppConfig: NodeAppConfig): Future[NeutrinoNode] = { + import system.dispatcher + + val checkConfigF = Future { + assert(nodeAppConfig.nodeType == NodeType.NeutrinoNode) + } + val chainApiF = for { + _ <- checkConfigF + chainHandler <- ChainUnitTest.createChainHandler() + } yield chainHandler + val peersF = bitcoinds.map(createPeer(_)) + val nodeF = for { + chainApi <- chainApiF + peers <- Future.sequence(peersF) + } yield { + val dmh = DataMessageHandler(chainApi) + NeutrinoNode(nodePeer = peers, dataMessageHandler = dmh, nodeConfig = nodeAppConfig, chainConfig = chainAppConfig, diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/fixture/NodeConnectedWithBitcoind.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/fixture/NodeConnectedWithBitcoind.scala index dacb261f14..0bd2ee11d7 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/fixture/NodeConnectedWithBitcoind.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/fixture/NodeConnectedWithBitcoind.scala @@ -24,3 +24,13 @@ case class NeutrinoNodeConnectedWithBitcoind( node: NeutrinoNode, bitcoind: BitcoindRpcClient) extends NodeConnectedWithBitcoind + +trait NodeConnectedWithBitcoinds { + def node: Node + def bitcoinds: Vector[BitcoindRpcClient] +} + +case class NeutrinoNodeConnectedWithBitcoinds( + node: NeutrinoNode, + bitcoinds: Vector[BitcoindRpcClient] +) extends NodeConnectedWithBitcoinds diff --git a/testkit/src/main/scala/org/bitcoins/testkit/rpc/CachedBitcoind.scala b/testkit/src/main/scala/org/bitcoins/testkit/rpc/CachedBitcoind.scala index 3a0ab604c8..b944f8aa0a 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/rpc/CachedBitcoind.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/rpc/CachedBitcoind.scala @@ -189,6 +189,25 @@ trait CachedBitcoindPair[T <: BitcoindRpcClient] } } +trait CachedBitcoindPairV21 + extends CachedBitcoindCollection[BitcoindV21RpcClient] { + _: BitcoinSAkkaAsyncTest => + + override val version: BitcoindVersion = BitcoindVersion.V21 + + lazy val clientsF: Future[NodePair[BitcoindV21RpcClient]] = { + BitcoindRpcTestUtil + .createNodePair[BitcoindV21RpcClient](version) + .map(NodePair.fromTuple) + .map { tuple => + isClientsUsed.set(true) + val clients = cachedClients.get() + cachedClients.set(clients ++ tuple.toVector) + tuple + } + } +} + trait CachedBitcoindTriple[T <: BitcoindRpcClient] extends CachedBitcoindCollection[T] { _: BitcoinSAkkaAsyncTest => @@ -204,5 +223,4 @@ trait CachedBitcoindTriple[T <: BitcoindRpcClient] triple } } - }