diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index ec5a7181b6..2bbb77f5bf 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -61,7 +61,7 @@ class BitcoinSServerMain(override val args: Array[String]) //get a node that isn't started val nodeF = configInitializedF.flatMap { _ => - nodeConf.createNode(peer, None)(chainConf, system) + nodeConf.createNode(peer)(chainConf, system) } //get our wallet diff --git a/docs/node/node.md b/docs/node/node.md index dcd0a131cd..d3ed588d08 100644 --- a/docs/node/node.md +++ b/docs/node/node.md @@ -44,6 +44,7 @@ For your node to be able to service these filters you will need set import akka.actor.ActorSystem import org.bitcoins.core.protocol.blockchain.Block import org.bitcoins.node._ +import org.bitcoins.node.networking.peer._ import org.bitcoins.rpc.client.common.BitcoindVersion import org.bitcoins.testkit.node._ import org.bitcoins.testkit.node.fixture._ @@ -108,14 +109,15 @@ val chainApiF = for { //yay! All setup done, let's create a node and then start it! val nodeF = for { - _ <- chainApiF + chainApi <- chainApiF peer <- peerF } yield { + val dataMessageHandler = DataMessageHandler(chainApi) NeutrinoNode(nodePeer = peer, + dataMessageHandler = dataMessageHandler, nodeConfig = nodeConfig, chainConfig = chainConfig, - actorSystem = system, - initialSyncDone = None) + actorSystem = system) } //let's start it @@ -155,4 +157,4 @@ val cleanupF = for { } yield () Await.result(cleanupF, 60.seconds) -``` \ No newline at end of file +``` diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala index 2fb80987c6..0f471b0999 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/P2PClientTest.scala @@ -11,7 +11,11 @@ import org.bitcoins.crypto.{CryptoUtil, DoubleSha256Digest} import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.peer.PeerMessageReceiver import org.bitcoins.testkit.async.TestAsyncUtil -import org.bitcoins.testkit.node.{CachedBitcoinSAppConfig, NodeTestUtil} +import org.bitcoins.testkit.node.{ + CachedBitcoinSAppConfig, + NodeTestUtil, + NodeUnitTest +} import org.bitcoins.testkit.rpc.BitcoindRpcTestUtil import org.bitcoins.testkit.util.BitcoindRpcTest import org.scalatest._ @@ -168,7 +172,9 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig { val probe = TestProbe() val remote = peer.socket val peerMessageReceiverF = - PeerMessageReceiver.preConnection(peer, None) + for { + node <- NodeUnitTest.buildNode(peer) + } yield PeerMessageReceiver.preConnection(peer, node) val clientActorF: Future[TestActorRef[P2PClientActor]] = peerMessageReceiverF.map { peerMsgRecv => diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index 8f06157212..3efb7654a0 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -32,8 +32,8 @@ class DataMessageHandlerTest extends NodeUnitTest { param: SpvNodeConnectedWithBitcoindV19 => val SpvNodeConnectedWithBitcoindV19(spv, _) = param + val sender = spv.peerMsgSender for { - sender <- spv.peerMsgSenderF chainApi <- spv.chainApiFromDb() dataMessageHandler = DataMessageHandler(chainApi)(spv.executionContext, spv.nodeAppConfig, @@ -66,9 +66,8 @@ class DataMessageHandlerTest extends NodeUnitTest { } } + val sender = spv.peerMsgSender for { - sender <- spv.peerMsgSenderF - txId <- bitcoind.sendToAddress(junkAddress, 1.bitcoin) tx <- bitcoind.getRawTransactionRaw(txId) _ <- bitcoind.generateToAddress(blocks = 1, junkAddress) @@ -102,9 +101,9 @@ class DataMessageHandlerTest extends NodeUnitTest { () } } - for { - sender <- spv.peerMsgSenderF + val sender = spv.peerMsgSender + for { hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) block <- bitcoind.getBlockRaw(hash) @@ -137,9 +136,8 @@ class DataMessageHandlerTest extends NodeUnitTest { } } + val sender = spv.peerMsgSender for { - sender <- spv.peerMsgSenderF - hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) header <- bitcoind.getBlockHeaderRaw(hash) @@ -171,9 +169,8 @@ class DataMessageHandlerTest extends NodeUnitTest { () } } + val sender = spv.peerMsgSender for { - sender <- spv.peerMsgSenderF - hash <- bitcoind.generateToAddress(blocks = 1, junkAddress).map(_.head) filter <- bitcoind.getBlockFilter(hash, FilterType.Basic) diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala index fe149c3a38..0d195b2b6a 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/PeerMessageHandlerTest.scala @@ -5,6 +5,7 @@ import org.bitcoins.node.models.Peer import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.testkit.BitcoinSTestAppConfig import org.bitcoins.testkit.async.TestAsyncUtil +import org.bitcoins.testkit.chain.ChainUnitTest import org.bitcoins.testkit.node.{ CachedBitcoinSAppConfig, NodeTestWithCachedBitcoindNewest, @@ -12,7 +13,7 @@ import org.bitcoins.testkit.node.{ } import org.scalatest.{FutureOutcome, Outcome} -import scala.concurrent.Future +import scala.concurrent.{Await, Future} import scala.concurrent.duration.DurationInt /** Created by chris on 7/1/16. @@ -39,35 +40,35 @@ class PeerMessageHandlerTest implicit protected lazy val chainConfig: ChainAppConfig = cachedConfig.chainConf + override def beforeAll(): Unit = { + val setupF = ChainUnitTest.setupHeaderTableWithGenesisHeader() + Await.result(setupF, duration) + () + } + + override def afterAll(): Unit = { + super[CachedBitcoinSAppConfig].afterAll() + super[NodeTestWithCachedBitcoindNewest].afterAll() + } + behavior of "PeerHandler" it must "be able to fully initialize a PeerMessageReceiver" in { peer => - val peerHandlerF = NodeUnitTest.buildPeerHandler(peer) - val peerMsgSenderF = peerHandlerF.map(_.peerMsgSender) - val p2pClientF = peerHandlerF.map(_.p2pClient) + for { + peerHandler <- NodeUnitTest.buildPeerHandler(peer) + peerMsgSender = peerHandler.peerMsgSender + p2pClient = peerHandler.p2pClient - val _ = peerHandlerF.map(_.peerMsgSender.connect()) - val isConnectedF = TestAsyncUtil.retryUntilSatisfiedF( - () => p2pClientF.flatMap(_.isConnected()), - interval = 500.millis - ) + _ = peerMsgSender.connect() - val isInitF = isConnectedF.flatMap { _ => - TestAsyncUtil.retryUntilSatisfiedF(() => - p2pClientF.flatMap(_.isInitialized())) - } + _ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isConnected(), + interval = 500.millis) - val disconnectF = isInitF.flatMap { _ => - peerMsgSenderF.map(_.disconnect()) - } + _ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isInitialized()) + _ <- peerMsgSender.disconnect() - val isDisconnectedF = disconnectF.flatMap { _ => - TestAsyncUtil.retryUntilSatisfiedF(() => - p2pClientF.flatMap(_.isDisconnected())) - - } - - isDisconnectedF.map(_ => succeed) + _ <- TestAsyncUtil.retryUntilSatisfiedF(() => p2pClient.isDisconnected()) + } yield succeed } /* diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 5c96f2a8d3..ba93c88377 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -1,6 +1,5 @@ package org.bitcoins.node -import akka.Done import akka.actor.ActorSystem import org.bitcoins.chain.config.ChainAppConfig import org.bitcoins.chain.models.BlockHeaderDAO @@ -8,14 +7,15 @@ import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer +import org.bitcoins.node.networking.peer.DataMessageHandler -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future case class NeutrinoNode( nodePeer: Peer, + dataMessageHandler: DataMessageHandler, nodeConfig: NodeAppConfig, chainConfig: ChainAppConfig, - initialSyncDone: Option[Promise[Done]], actorSystem: ActorSystem) extends Node { require( @@ -30,12 +30,16 @@ case class NeutrinoNode( override val peer: Peer = nodePeer + override def updateDataMessageHandler( + dataMessageHandler: DataMessageHandler): NeutrinoNode = { + copy(dataMessageHandler = dataMessageHandler) + } + override def start(): Future[NeutrinoNode] = { val res = for { node <- super.start() chainApi <- chainApiFromDb() bestHash <- chainApi.getBestBlockHash() - peerMsgSender <- peerMsgSenderF _ <- peerMsgSender.sendGetCompactFilterCheckPointMessage( stopHash = bestHash.flip) } yield { @@ -62,7 +66,6 @@ case class NeutrinoNode( header <- chainApi.getBestBlockHeader() filterHeaderCount <- chainApi.getFilterHeaderCount() filterCount <- chainApi.getFilterCount() - peerMsgSender <- peerMsgSenderF blockchains <- blockchainsF } yield { // Get all of our cached headers in case of a reorg @@ -101,5 +104,4 @@ case class NeutrinoNode( startHeight: Int, endHeight: Int): Future[Vector[FilterResponse]] = chainApiFromDb().flatMap(_.getFiltersBetweenHeights(startHeight, endHeight)) - } diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index 102ae82a74..f3f95896ef 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -1,6 +1,5 @@ package org.bitcoins.node -import akka.Done import akka.actor.ActorSystem import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.chain.blockchain.ChainHandlerCached @@ -23,12 +22,13 @@ import org.bitcoins.node.models.{ } import org.bitcoins.node.networking.P2PClient import org.bitcoins.node.networking.peer.{ + DataMessageHandler, PeerMessageReceiver, PeerMessageSender } import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} /** This a base trait for various kinds of nodes. It contains house keeping methods required for all nodes. @@ -45,12 +45,20 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { val peer: Peer - protected val initialSyncDone: Option[Promise[Done]] + /** The current data message handler. + * It should be noted that the dataMessageHandler contains + * chainstate. When we update with a new chainstate, we need to + * maek sure we update the [[DataMessageHandler]] via [[updateDataMessageHandler()]] + * to make sure we don't corrupt our chainstate cache + */ + def dataMessageHandler: DataMessageHandler def nodeCallbacks: NodeCallbacks = nodeAppConfig.nodeCallbacks lazy val txDAO: BroadcastAbleTransactionDAO = BroadcastAbleTransactionDAO() + def updateDataMessageHandler(dataMessageHandler: DataMessageHandler): Node + /** This is constructing a chain api from disk every time we call this method * This involves database calls which can be slow and expensive to construct * our [[org.bitcoins.chain.blockchain.Blockchain Blockchain]] @@ -66,26 +74,17 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * object. Internally in [[org.bitcoins.node.networking.P2PClient p2p client]] you will see that * the [[ChainApi chain api]] is updated inside of the p2p client */ - lazy val clientF: Future[P2PClient] = { - val chainApiF = chainApiFromDb() - for { - chainApi <- chainApiF - } yield { - val peerMsgRecv: PeerMessageReceiver = - PeerMessageReceiver.newReceiver(chainApi = chainApi, - peer = peer, - initialSyncDone = initialSyncDone) - val p2p = P2PClient(context = system, - peer = peer, - peerMessageReceiver = peerMsgRecv) - p2p - } + lazy val client: P2PClient = { + val peerMsgRecv: PeerMessageReceiver = + PeerMessageReceiver.newReceiver(node = this, peer = peer) + val p2p = P2PClient(context = system, + peer = peer, + peerMessageReceiver = peerMsgRecv) + p2p } - lazy val peerMsgSenderF: Future[PeerMessageSender] = { - clientF.map { client => - PeerMessageSender(client) - } + lazy val peerMsgSender: PeerMessageSender = { + PeerMessageSender(client) } /** Sends the given P2P to our peer. @@ -94,21 +93,21 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * `private[node]`. */ def send(msg: NetworkPayload): Future[Unit] = { - peerMsgSenderF.flatMap(_.sendMsg(msg)) + peerMsgSender.sendMsg(msg) } /** Checks if we have a tcp connection with our peer */ - def isConnected: Future[Boolean] = peerMsgSenderF.flatMap(_.isConnected()) + def isConnected: Future[Boolean] = peerMsgSender.isConnected() /** Checks if we are fully initialized with our peer and have executed the handshake * This means we can now send arbitrary messages to our peer * * @return */ - def isInitialized: Future[Boolean] = peerMsgSenderF.flatMap(_.isInitialized()) + def isInitialized: Future[Boolean] = peerMsgSender.isInitialized() def isDisconnected: Future[Boolean] = - peerMsgSenderF.flatMap(_.isDisconnected()) + peerMsgSender.isDisconnected() /** Starts our node */ def start(): Future[Node] = { @@ -123,8 +122,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { val chainApiF = startConfsF.flatMap(_ => chainApiFromDb()) val startNodeF = { + peerMsgSender.connect() val isInitializedF = for { - _ <- peerMsgSenderF.map(_.connect()) _ <- AsyncUtil.retryUntilSatisfiedF(() => isInitialized, interval = 250.millis) } yield () @@ -165,8 +164,7 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { def stop(): Future[Node] = { logger.info(s"Stopping node") val disconnectF = for { - p <- peerMsgSenderF - disconnect <- p.disconnect() + disconnect <- peerMsgSender.disconnect() _ <- nodeAppConfig.stop() } yield disconnect @@ -202,11 +200,11 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { chainApi <- chainApiFromDb() header <- chainApi.getBestBlockHeader() blockchains <- blockchainsF - } yield { + // Get all of our cached headers in case of a reorg - val cachedHeaders = - blockchains.flatMap(_.headers).map(_.hashBE.flip) - peerMsgSenderF.map(_.sendGetHeadersMessage(cachedHeaders)) + cachedHeaders = blockchains.flatMap(_.headers).map(_.hashBE.flip) + _ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders) + } yield { logger.info( s"Starting sync node, height=${header.height} hash=${header.hashBE}") } @@ -231,7 +229,6 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { for { _ <- addToDbF - peerMsgSender <- peerMsgSenderF connected <- isConnected @@ -254,11 +251,8 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { if (blockHashes.isEmpty) { Future.unit } else { - for { - peerMsgSender <- peerMsgSenderF - _ <- peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, - blockHashes: _*) - } yield () + peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, + blockHashes: _*) } } diff --git a/node/src/main/scala/org/bitcoins/node/SpvNode.scala b/node/src/main/scala/org/bitcoins/node/SpvNode.scala index 713e61778f..2d48d73951 100644 --- a/node/src/main/scala/org/bitcoins/node/SpvNode.scala +++ b/node/src/main/scala/org/bitcoins/node/SpvNode.scala @@ -1,6 +1,5 @@ package org.bitcoins.node -import akka.Done import akka.actor.ActorSystem import org.bitcoins.asyncutil.AsyncUtil import org.bitcoins.chain.config.ChainAppConfig @@ -11,14 +10,15 @@ import org.bitcoins.core.protocol.{BitcoinAddress, BlockStamp} import org.bitcoins.core.util.Mutable import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer +import org.bitcoins.node.networking.peer.DataMessageHandler -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future case class SpvNode( nodePeer: Peer, + dataMessageHandler: DataMessageHandler, nodeConfig: NodeAppConfig, chainConfig: ChainAppConfig, - initialSyncDone: Option[Promise[Done]], actorSystem: ActorSystem) extends Node { require(nodeConfig.nodeType == NodeType.SpvNode, @@ -41,6 +41,11 @@ case class SpvNode( this } + override def updateDataMessageHandler( + dataMessageHandler: DataMessageHandler): SpvNode = { + copy(dataMessageHandler = dataMessageHandler) + } + /** Updates our bloom filter to match the given TX * * @return SPV node with the updated bloom filter @@ -53,9 +58,8 @@ case class SpvNode( // then need to calculate all the new elements in // the filter. this is easier:-) for { - p <- peerMsgSenderF - _ <- p.sendFilterClearMessage() - _ <- p.sendFilterLoadMessage(newBloom) + _ <- peerMsgSender.sendFilterClearMessage() + _ <- peerMsgSender.sendFilterLoadMessage(newBloom) } yield this } @@ -69,7 +73,7 @@ case class SpvNode( val hash = address.hash _bloomFilter.atomicUpdate(hash)(_.insert(_)) - val sentFilterAddF = peerMsgSenderF.flatMap(_.sendFilterAddMessage(hash)) + val sentFilterAddF = peerMsgSender.sendFilterAddMessage(hash) sentFilterAddF.map(_ => this) } @@ -77,7 +81,6 @@ case class SpvNode( override def start(): Future[SpvNode] = { for { node <- super.start() - peerMsgSender <- peerMsgSenderF _ <- AsyncUtil.retryUntilSatisfiedF(() => isConnected) _ <- peerMsgSender.sendFilterLoadMessage(bloomFilter) } yield { diff --git a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala index 28dd874c3d..098937c078 100644 --- a/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala +++ b/node/src/main/scala/org/bitcoins/node/config/NodeAppConfig.scala @@ -1,17 +1,23 @@ package org.bitcoins.node.config -import akka.Done import akka.actor.ActorSystem import com.typesafe.config.Config +import org.bitcoins.chain.blockchain.ChainHandlerCached import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models.{ + BlockHeaderDAO, + CompactFilterDAO, + CompactFilterHeaderDAO +} import org.bitcoins.core.util.Mutable import org.bitcoins.db.{AppConfigFactory, DbAppConfig, JdbcProfileComponent} import org.bitcoins.node._ import org.bitcoins.node.db.NodeDbManagement import org.bitcoins.node.models.Peer +import org.bitcoins.node.networking.peer.DataMessageHandler import java.nio.file.Path -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} /** Configuration for the Bitcoin-S node * @param directory The data directory of the node @@ -82,10 +88,10 @@ case class NodeAppConfig( } /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ - def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])( + def createNode(peer: Peer)( chainConf: ChainAppConfig, system: ActorSystem): Future[Node] = { - NodeAppConfig.createNode(peer, initialSyncDone)(this, chainConf, system) + NodeAppConfig.createNode(peer)(this, chainConf, system) } } @@ -101,17 +107,25 @@ object NodeAppConfig extends AppConfigFactory[NodeAppConfig] { NodeAppConfig(datadir, confs: _*) /** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */ - def createNode(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit + def createNode(peer: Peer)(implicit nodeConf: NodeAppConfig, chainConf: ChainAppConfig, system: ActorSystem): Future[Node] = { + import system.dispatcher + + val blockHeaderDAO = BlockHeaderDAO() + val filterHeaderDAO = CompactFilterHeaderDAO() + val filterDAO = CompactFilterDAO() + + val dmhF = ChainHandlerCached + .fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO) + .map(handler => DataMessageHandler(handler)) + nodeConf.nodeType match { case NodeType.SpvNode => - Future.successful( - SpvNode(peer, nodeConf, chainConf, initialSyncDone, system)) + dmhF.map(dmh => SpvNode(peer, dmh, nodeConf, chainConf, system)) case NodeType.NeutrinoNode => - Future.successful( - NeutrinoNode(peer, nodeConf, chainConf, initialSyncDone, system)) + dmhF.map(dmh => NeutrinoNode(peer, dmh, nodeConf, chainConf, system)) case NodeType.FullNode => Future.failed(new RuntimeException("Not implemented")) case NodeType.BitcoindBackend => diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala index 476caee266..059ac90917 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/PeerMessageReceiver.scala @@ -1,28 +1,14 @@ package org.bitcoins.node.networking.peer -import akka.Done import akka.actor.ActorRefFactory -import org.bitcoins.chain.blockchain.ChainHandlerCached -import org.bitcoins.chain.config.ChainAppConfig -import org.bitcoins.chain.models.{ - BlockHeaderDAO, - CompactFilterDAO, - CompactFilterHeaderDAO -} -import org.bitcoins.core.api.chain.ChainApi -import org.bitcoins.core.p2p.{NetworkMessage, _} +import org.bitcoins.core.p2p._ import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer import org.bitcoins.node.networking.P2PClient -import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{ - Disconnected, - Initializing, - Normal, - Preconnection -} -import org.bitcoins.node.{NodeType, P2PLogger} +import org.bitcoins.node.networking.peer.PeerMessageReceiverState._ +import org.bitcoins.node.{Node, NodeType, P2PLogger} -import scala.concurrent.{Future, Promise} +import scala.concurrent.Future /** Responsible for receiving messages from a peer on the * p2p network. This is called by [[org.bitcoins.rpc.client.common.Client Client]] when doing the p2p @@ -31,13 +17,10 @@ import scala.concurrent.{Future, Promise} * [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]] */ class PeerMessageReceiver( - dataMessageHandler: DataMessageHandler, + node: Node, val state: PeerMessageReceiverState, peer: Peer -)(implicit - ref: ActorRefFactory, - nodeAppConfig: NodeAppConfig, - chainAppConfig: ChainAppConfig) +)(implicit ref: ActorRefFactory, nodeAppConfig: NodeAppConfig) extends P2PLogger { import ref.dispatcher @@ -62,7 +45,7 @@ class PeerMessageReceiver( val peerMsgSender = PeerMessageSender(client) - peerMsgSender.sendVersionMessage(dataMessageHandler.chainApi) + peerMsgSender.sendVersionMessage(node.dataMessageHandler.chainApi) val newRecv = toState(newState) @@ -135,8 +118,9 @@ class PeerMessageReceiver( sender: PeerMessageSender): Future[PeerMessageReceiver] = { //else it means we are receiving this data payload from a peer, //we need to handle it - dataMessageHandler.handleDataPayload(payload, sender).map { handler => - new PeerMessageReceiver(handler, state, peer) + node.dataMessageHandler.handleDataPayload(payload, sender).map { handler => + val newNode = node.updateDataMessageHandler(handler) + new PeerMessageReceiver(newNode, state, peer) } } @@ -152,7 +136,7 @@ class PeerMessageReceiver( payload match { case versionMsg: VersionMessage => - logger.trace(s"Received versionMsg=${versionMsg}from peer=${peer}") + logger.trace(s"Received versionMsg=$versionMsg from peer=$peer") state match { case bad @ (_: Disconnected | _: Normal | Preconnection) => @@ -163,28 +147,6 @@ class PeerMessageReceiver( case good: Initializing => val newState = good.withVersionMsg(versionMsg) - // TODO: do not throw error once we have peer discovery - nodeAppConfig.nodeType match { - case NodeType.NeutrinoNode => - if (!versionMsg.services.nodeCompactFilters) { - val errMsg = - s"Connected Peer ($peer) does not support compact filters" - logger.warn(errMsg) - sys.error(errMsg) - } - case NodeType.SpvNode => - if (!versionMsg.services.nodeBloom) { - val errMsg = - s"Connected Peer ($peer) does not support bloom filters" - logger.warn(errMsg) - sys.error(errMsg) - } - case NodeType.FullNode => - sys.error("Not yet implemented.") - case NodeType.BitcoindBackend => - throw new RuntimeException("This is impossible") - } - sender.sendVerackMessage() val newRecv = toState(newState) @@ -235,7 +197,7 @@ class PeerMessageReceiver( /** Transitions our PeerMessageReceiver to a new state */ def toState(newState: PeerMessageReceiverState): PeerMessageReceiver = { new PeerMessageReceiver( - dataMessageHandler = dataMessageHandler, + node = node, state = newState, peer = peer ) @@ -255,59 +217,31 @@ object PeerMessageReceiver { case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient) extends PeerMessageReceiverMsg - def apply( - state: PeerMessageReceiverState, - chainApi: ChainApi, - peer: Peer, - initialSyncDone: Option[Promise[Done]])(implicit + def apply(state: PeerMessageReceiverState, node: Node, peer: Peer)(implicit ref: ActorRefFactory, - nodeAppConfig: NodeAppConfig, - chainAppConfig: ChainAppConfig + nodeAppConfig: NodeAppConfig ): PeerMessageReceiver = { - import ref.dispatcher - val dataHandler = DataMessageHandler(chainApi, initialSyncDone) - new PeerMessageReceiver(dataMessageHandler = dataHandler, - state = state, - peer = peer) + new PeerMessageReceiver(node = node, state = state, peer = peer) } /** Creates a peer message receiver that is ready * to be connected to a peer. This can be given to [[org.bitcoins.node.networking.P2PClient.props() P2PClient]] * to connect to a peer on the network */ - def preConnection(peer: Peer, initialSyncDone: Option[Promise[Done]])(implicit + def preConnection(peer: Peer, node: Node)(implicit ref: ActorRefFactory, - nodeAppConfig: NodeAppConfig, - chainAppConfig: ChainAppConfig - ): Future[PeerMessageReceiver] = { - import ref.dispatcher - val blockHeaderDAO = BlockHeaderDAO() - val filterHeaderDAO = CompactFilterHeaderDAO() - val filterDAO = CompactFilterDAO() - val chainHandlerF = - ChainHandlerCached.fromDatabase(blockHeaderDAO, - filterHeaderDAO, - filterDAO) - for { - chainHandler <- chainHandlerF - } yield { - PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), - chainApi = chainHandler, - peer = peer, - initialSyncDone = initialSyncDone) - } + nodeAppConfig: NodeAppConfig + ): PeerMessageReceiver = { + PeerMessageReceiver(node = node, + state = PeerMessageReceiverState.fresh(), + peer = peer) } - def newReceiver( - chainApi: ChainApi, - peer: Peer, - initialSyncDone: Option[Promise[Done]])(implicit + def newReceiver(node: Node, peer: Peer)(implicit nodeAppConfig: NodeAppConfig, - chainAppConfig: ChainAppConfig, ref: ActorRefFactory): PeerMessageReceiver = { PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), - chainApi = chainApi, - peer = peer, - initialSyncDone = initialSyncDone) + node = node, + peer = peer) } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/CachedAppConfig.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/CachedAppConfig.scala index be654ad36c..136076c4ad 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/CachedAppConfig.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/CachedAppConfig.scala @@ -23,7 +23,7 @@ sealed trait CachedAppConfig { _: BitcoinSAkkaAsyncTest => trait CachedBitcoinSAppConfig { _: BitcoinSAkkaAsyncTest => implicit protected lazy val cachedConfig: BitcoinSAppConfig = - BitcoinSTestAppConfig.getSpvTestConfig() + BitcoinSTestAppConfig.getNeutrinoTestConfig() implicit protected lazy val cachedNodeConf: NodeAppConfig = { cachedConfig.nodeConf diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index ed44d15219..25cde25707 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -1,17 +1,14 @@ package org.bitcoins.testkit.node import akka.actor.ActorSystem +import org.bitcoins.chain.blockchain.ChainHandlerCached import org.bitcoins.chain.config.ChainAppConfig +import org.bitcoins.chain.models._ import org.bitcoins.core.api.chain.ChainApi import org.bitcoins.node._ import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models.Peer -import org.bitcoins.node.networking.peer.{ - PeerHandler, - PeerMessageReceiver, - PeerMessageReceiverState, - PeerMessageSender -} +import org.bitcoins.node.networking.peer._ import org.bitcoins.rpc.client.common.BitcoindVersion.{V18, V19} import org.bitcoins.rpc.client.common.{BitcoindRpcClient, BitcoindVersion} import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient @@ -189,14 +186,39 @@ trait NodeUnitTest extends BaseNodeTest { object NodeUnitTest extends P2PLogger { + def buildNode(peer: Peer)(implicit + chainConf: ChainAppConfig, + nodeConf: NodeAppConfig, + system: ActorSystem): Future[NeutrinoNode] = { + import system.dispatcher + + val blockHeaderDAO = BlockHeaderDAO() + val filterHeaderDAO = CompactFilterHeaderDAO() + val filterDAO = CompactFilterDAO() + + val chainApiF = ChainHandlerCached + .fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO) + + chainApiF.map(buildNode(peer, _)) + } + + def buildNode(peer: Peer, chainApi: ChainApi)(implicit + chainConf: ChainAppConfig, + nodeConf: NodeAppConfig, + system: ActorSystem): NeutrinoNode = { + import system.dispatcher + + val dmh = DataMessageHandler(chainApi) + + NeutrinoNode(peer, dmh, nodeConf, chainConf, system) + } + def buildPeerMessageReceiver(chainApi: ChainApi, peer: Peer)(implicit appConfig: BitcoinSAppConfig, system: ActorSystem): Future[PeerMessageReceiver] = { - val receiver = - PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), - chainApi = chainApi, - peer = peer, - initialSyncDone = None) + val receiver = PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), + node = buildNode(peer, chainApi), + peer = peer) Future.successful(receiver) } @@ -205,9 +227,9 @@ object NodeUnitTest extends P2PLogger { chainAppConfig: ChainAppConfig, system: ActorSystem): Future[PeerHandler] = { import system.dispatcher - val chainApiF = ChainUnitTest.createChainHandler() - val peerMsgReceiverF = chainApiF.flatMap { _ => - PeerMessageReceiver.preConnection(peer, None) + val nodeF = buildNode(peer) + val peerMsgReceiverF = nodeF.map { node => + PeerMessageReceiver.preConnection(peer, node) } //the problem here is the 'self', this needs to be an ordinary peer message handler //that can handle the handshake @@ -394,9 +416,8 @@ object NodeUnitTest extends P2PLogger { system: ActorSystem): Future[PeerMessageReceiver] = { val receiver = PeerMessageReceiver(state = PeerMessageReceiverState.fresh(), - chainApi = chainApi, - peer = peer, - initialSyncDone = None) + node = buildNode(peer, chainApi), + peer = peer) Future.successful(receiver) } @@ -427,24 +448,20 @@ object NodeUnitTest extends P2PLogger { val checkConfigF = Future { assert(nodeAppConfig.nodeType == NodeType.SpvNode) } - val chainApiF = for { + + for { _ <- checkConfigF chainHandler <- ChainUnitTest.createChainHandler() - } yield chainHandler - val nodeF = for { - _ <- chainApiF } yield { + val dmh = DataMessageHandler(chainHandler) SpvNode( nodePeer = peer, + dataMessageHandler = dmh, nodeConfig = nodeAppConfig, chainConfig = chainAppConfig, - actorSystem = system, - initialSyncDone = None + actorSystem = system ).setBloomFilter(P2PMessageTestUtil.emptyBloomFilter) } - - nodeF - } /** Creates a Neutrino node peered with the given bitcoind client, this method @@ -465,13 +482,14 @@ object NodeUnitTest extends P2PLogger { } yield chainHandler val peer = createPeer(bitcoind) val nodeF = for { - _ <- chainApiF + chainApi <- chainApiF } yield { + val dmh = DataMessageHandler(chainApi) NeutrinoNode(nodePeer = peer, + dataMessageHandler = dmh, nodeConfig = nodeAppConfig, chainConfig = chainAppConfig, - actorSystem = system, - initialSyncDone = None) + actorSystem = system) } nodeF